优化任务数据格式

This commit is contained in:
ChenYi 2025-04-18 11:31:23 +08:00
parent dc7416bdbf
commit 6c2560669e
9 changed files with 155 additions and 191 deletions

View File

@ -64,7 +64,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
//await dbContext.InitAmmeterCacheData(); await dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData(); //await dbContext.InitWatermeterCacheData();
//初始化主题信息 //初始化主题信息

View File

@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
@ -25,7 +26,6 @@ using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Interface;
using static FreeSql.Internal.GlobalFilter; using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
@ -182,7 +182,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) => processor: (data, groupIndex) =>
{ {
AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
} }
); );
@ -306,8 +306,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop(); timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
return; //return;
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif #endif
@ -575,8 +575,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{ {
Stopwatch stopwatch = new Stopwatch(); var stopwatch = Stopwatch.StartNew();
stopwatch.Start();
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
@ -589,97 +588,23 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
MaxDegreeOfParallelism = recommendedThreads, MaxDegreeOfParallelism = recommendedThreads,
}; };
string taskBatch = "20250417155016"; string taskBatch = "20250417171649";
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{ {
Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>(); _ = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
_= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
}
);
} while (hasNext);
}); });
stopwatch.Stop();
//var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
//var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); await Task.CompletedTask;
//if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
// return;
//}
////获取下发任务缓存数据
//Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
//if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
// return;
//}
//List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
////将取出的缓存任务数据发送到Kafka消息队列中
//foreach (var focusItem in meterTaskInfos)
//{
// foreach (var ammerterItem in focusItem.Value)
// {
// var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
// {
// MessageHexString = ammerterItem.Value.IssuedMessageHexString,
// MessageId = ammerterItem.Value.IssuedMessageId,
// FocusAddress = ammerterItem.Value.FocusAddress,
// TimeDensity = timeDensity.ToString(),
// };
// //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// //_ = _producerBus.Publish(tempMsg);
// meterTaskInfosList.Add(ammerterItem.Value);
// }
//}
//if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
//{
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
//}
//stopwatch.Stop();
//_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
} }
/// <summary> /// <summary>
/// 电表创建发布任务 /// 电表创建发布任务
@ -697,7 +622,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
@ -781,7 +706,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
//Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes) foreach (var tempItem in tempCodes)
@ -801,11 +725,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var aFNStr = itemCodeArr[0]; var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec(); var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]); var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null; TelemetryPacketResponse builderResponse = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.) if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
{ {
//实时数据 //实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
} }
else else
{ {
@ -814,7 +743,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler)) , out var handler))
{ {
dataInfos = handler(new TelemetryPacketRequest() builderResponse = handler(new TelemetryPacketRequest()
{ {
FocusAddress = ammeterInfo.FocusAddress, FocusAddress = ammeterInfo.FocusAddress,
Fn = fn, Fn = fn,
@ -829,7 +758,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//TODO:特殊表 //TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0) if (builderResponse == null || builderResponse.Data.Length <= 0)
{ {
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue; continue;
@ -850,29 +779,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusId = ammeterInfo.FocusId, FocusId = ammeterInfo.FocusId,
AFN = aFN, AFN = aFN,
Fn = fn, Fn = fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem, ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
IsSend = false,
ManualOrNot = false, ManualOrNot = false,
Pn = ammeterInfo.MeteringCode, Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos), IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
}; };
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
} }
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
//return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
//using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// pipe.HSet(redisCacheKey, keyValuePairs);
// object[] ret = pipe.EndPipe();
//}
if (taskList == null if (taskList == null
|| taskList.Count() <= 0 || taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
@ -914,7 +835,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param> /// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns> /// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName, private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingTelemetryPacketInfo taskRecord,int partition) MeterReadingTelemetryPacketInfo taskRecord, int partition)
{ {
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{ {
@ -924,58 +845,38 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _producerService.ProduceAsync(topicName, partition, taskRecord); await _producerService.ProduceAsync(topicName, partition, taskRecord);
} }
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) private async Task GetTaskInfoListToKafka(
string redisCacheTelemetryPacketInfoHashKey,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
{ {
var currentDateTime = DateTime.Now; decimal? cursor = null;
string member = null;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); bool hasNext;
do
//FreeRedisProvider.Instance.key()
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
return; redisCacheTelemetryPacketInfoHashKey,
} redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
//获取下发任务缓存数据 cursor = page.HasNext ? page.NextScore : null;
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); member = page.HasNext ? page.NextMember : null;
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) hasNext = page.HasNext;
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>(); await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: page.Items,
//将取出的缓存任务数据发送到Kafka消息队列中 deviceIdSelector: data => data.FocusAddress,
foreach (var focusItem in meterTaskInfos) processor: (data, groupIndex) =>
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{ {
MessageHexString = ammerterItem.Value.IssuedMessageHexString, _ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
MessageId = ammerterItem.Value.IssuedMessageId, }
FocusAddress = ammerterItem.Value.FocusAddress, );
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); } while (hasNext);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
} }
#endregion #endregion

View File

@ -34,7 +34,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary> /// <summary>
/// 任务数据唯一标记 /// 任务数据唯一标记
/// </summary> /// </summary>
public string TaskMark { get; set; } public decimal TaskMark { get; set; }
/// <summary> /// <summary>
/// 时间戳标记IoTDB时间列处理上报通过构建标记获取唯一标记匹配时间戳。 /// 时间戳标记IoTDB时间列处理上报通过构建标记获取唯一标记匹配时间戳。
@ -96,7 +96,21 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 采集项编码 /// 采集项编码
/// </summary> /// </summary>
public string ItemCode { get; set;} public string ItemCode { get; set;}
/// <summary>
/// 帧序列域SEQ
/// </summary>
public required Seq Seq { get; set; }
/// <summary>
/// 地址域A3的主站地址MSA
/// </summary>
public int MSA { get; set; }
/// <summary>
/// 是否发送
/// </summary>
public bool IsSend { get; set; }
/// <summary> /// <summary>
/// 创建时间 /// 创建时间
@ -132,6 +146,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 上报报文解析备注,异常情况下才有 /// 上报报文解析备注,异常情况下才有
/// </summary> /// </summary>
public string ReceivedRemark { get; set; } public string ReceivedRemark { get; set; }
/// <summary>
/// 是否已上报
/// </summary>
public bool IsReceived { get; set; }
//public void CreateDataId(Guid Id) //public void CreateDataId(Guid Id)
//{ //{

View File

@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
/// <param name="request.FocusAddress"></param> /// <param name="request.FocusAddress"></param>
/// <param name="request.Fn"></param> /// <param name="request.Fn"></param>
/// <param name="request.Pn"></param> /// <param name="request.Pn"></param>
public delegate byte[] AFNDelegate(TelemetryPacketRequest request); public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request);
/// <summary> /// <summary>
/// 编码与方法的映射表 /// 编码与方法的映射表
@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
} }
#region AFN_00H #region AFN_00H
public static byte[] AFN00_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -64,13 +64,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_01H #region AFN_01H
public static byte[] AFN01_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -89,13 +89,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_02H #region AFN_02H
public static byte[] AFN02_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -114,12 +114,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_04H #region AFN_04H
public static byte[] AFN04_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -138,13 +138,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_05H #region AFN_05H
public static byte[] AFN05_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -163,12 +163,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_09H #region AFN_09H
public static byte[] AFN09_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -187,13 +187,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_0AH #region AFN_0AH
public static byte[] AFN0A_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -212,12 +212,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_0CH #region AFN_0CH
public static byte[] AFN0C_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -236,12 +236,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN_0DH #region AFN_0DH
public static byte[] AFN0D_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -260,12 +260,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#endregion #endregion
#region AFN10H #region AFN10H
public static byte[] AFN10_Fn_Send(TelemetryPacketRequest request) public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request)
{ {
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
@ -283,8 +283,8 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Pn = request.Pn, Pn = request.Pn,
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter,request.DataUnit); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit);
return bytes; return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
} }
#region SpecialAmmeter #region SpecialAmmeter

View File

@ -0,0 +1,30 @@
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.BuildSendDatas
{
/// <summary>
/// 报文构建返回结果
/// </summary>
public class TelemetryPacketResponse
{
/// <summary>
/// 帧序列域SEQ
/// </summary>
public required Seq Seq { get; set; }
/// <summary>
/// 地址域A3的主站地址MSA
/// </summary>
public int MSA { get; set; }
/// <summary>
/// 报文体
/// </summary>
public required byte[] Data { get; set; }
}
}

View File

@ -11,25 +11,31 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary> /// </summary>
public class CommonConst public class CommonConst
{ {
/// <summary>
/// 服务器标识
/// </summary>
public const string ServerTagName = "ServerTagName";
/// <summary> /// <summary>
/// Kafka /// Kafka
/// </summary> /// </summary>
public const string Kafka = "Kafka"; public const string Kafka = "Kafka";
/// <summary>
/// 服务器标识
/// </summary>
public const string ServerTagName = $"{Kafka}:ServerTagName";
/// <summary> /// <summary>
/// Kafka副本数量 /// Kafka副本数量
/// </summary> /// </summary>
public const string KafkaReplicationFactor = "KafkaReplicationFactor"; public const string KafkaReplicationFactor = $"{Kafka}:KafkaReplicationFactor";
/// <summary> /// <summary>
/// Kafka主题分区数量 /// Kafka主题分区数量
/// </summary> /// </summary>
public const string NumPartitions = "NumPartitions"; public const string NumPartitions = $"{Kafka}:NumPartitions";
/// <summary>
/// 首次采集时间
/// </summary>
public const string FirstCollectionTime = "FirstCollectionTime";
} }
} }

View File

@ -669,7 +669,7 @@ namespace JiShe.CollectBus.Common.Helpers
return att == null ? field.Name : ((DescriptionAttribute)att).Description; return att == null ? field.Name : ((DescriptionAttribute)att).Description;
} }
/// <summary> /// <summary>
/// 将传入的字符串中间部分字符替换成特殊字符 /// 将传入的字符串中间部分字符替换成特殊字符
@ -759,7 +759,7 @@ namespace JiShe.CollectBus.Common.Helpers
} }
return fontValue; return fontValue;
} }
/// <summary> /// <summary>
/// 获取任务标识 /// 获取任务标识
@ -767,10 +767,13 @@ namespace JiShe.CollectBus.Common.Helpers
/// <param name="afn"></param> /// <param name="afn"></param>
/// <param name="fn"></param> /// <param name="fn"></param>
/// <param name="pn"></param> /// <param name="pn"></param>
/// <param name="msa"></param>
/// <returns></returns> /// <returns></returns>
public static string GetTaskMark(int afn,int fn,int pn) public static decimal GetTaskMark(int afn, int fn, int pn, int msa)
{ {
return $"{afn.ToString().PadLeft(2,'0')}{fn}{pn}"; var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
return Convert.ToInt32(makstr) << 32 | msa;
} }
} }
} }

View File

@ -30,5 +30,10 @@ namespace JiShe.CollectBus.Common.Models
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳 /// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary> /// </summary>
public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId; public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId;
/// <summary>
/// 是否已处理
/// </summary>
public virtual bool IsHandle { get; set; } = false;
} }
} }

View File

@ -84,7 +84,7 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus2" "ServerTagName": "JiSheCollectBus3"
}, },
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
@ -95,7 +95,6 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus3",
"Cassandra": { "Cassandra": {
"ReplicationStrategy": { "ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy "Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy
@ -144,5 +143,6 @@
"SerialConsistencyLevel": "Serial", "SerialConsistencyLevel": "Serial",
"DefaultIdempotence": true "DefaultIdempotence": true
} }
} },
"FirstCollectionTime": "2025-04-18 00:00:00"
} }