Merge branch 'feature_定时抄读_18_CY' into dev
This commit is contained in:
commit
c5c428f551
@ -135,135 +135,6 @@ namespace JiShe.CollectBus.Protocol.T6452007
|
||||
result.IsSuccess = true;
|
||||
|
||||
return await Task.FromResult(result);
|
||||
}
|
||||
|
||||
|
||||
#region 上行命令
|
||||
|
||||
//68
|
||||
//32 00
|
||||
//32 00
|
||||
//68
|
||||
//C9 1100'1001. 控制域C。
|
||||
// D7=1, (终端发送)上行方向。
|
||||
// D6=1, 此帧来自启动站。
|
||||
// D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。
|
||||
// D4=0, 保留
|
||||
// D3~D0=9, 功能码。链路测试
|
||||
|
||||
//20 32 行政区划码
|
||||
//90 26 终端地址
|
||||
//00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2
|
||||
// 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0.
|
||||
//02 应用层功能码。AFN=2, 链路接口检测
|
||||
//70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。
|
||||
//00 00 信息点。DA1和DA2全为“0”时,表示终端信息点。
|
||||
//01 00 信息类。F1, 登录。
|
||||
//44 帧尾,包含用户区数据校验和
|
||||
//16 帧结束标志
|
||||
|
||||
/// <summary>
|
||||
/// 解析上行命令
|
||||
/// </summary>
|
||||
/// <param name="cmd"></param>
|
||||
/// <returns></returns>
|
||||
public CommandReulst? AnalysisCmd(string cmd)
|
||||
{
|
||||
CommandReulst? commandReulst = null;
|
||||
var hexStringList = cmd.StringToPairs();
|
||||
|
||||
if (hexStringList.Count < hearderLen)
|
||||
{
|
||||
return commandReulst;
|
||||
}
|
||||
//验证起始字符
|
||||
if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr())
|
||||
{
|
||||
return commandReulst;
|
||||
}
|
||||
|
||||
var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}";
|
||||
var lenBin = lenHexStr.HexToBin();
|
||||
var len = lenBin.Remove(lenBin.Length - 2).BinToDec();
|
||||
//验证长度
|
||||
if (hexStringList.Count - 2 != hearderLen + len)
|
||||
return commandReulst;
|
||||
|
||||
var userDataIndex = hearderLen;
|
||||
var c = hexStringList[userDataIndex];//控制域 1字节
|
||||
userDataIndex += 1;
|
||||
|
||||
var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节
|
||||
var a = AnalysisA(aHexList);
|
||||
var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0');
|
||||
var mSA = a3Bin.Substring(0, 7).BinToDec();
|
||||
userDataIndex += 5;
|
||||
|
||||
var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节
|
||||
userDataIndex += 1;
|
||||
|
||||
var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0');
|
||||
var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1));
|
||||
var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2));
|
||||
var cON = (CON)Convert.ToInt32(seq.Substring(3, 1));
|
||||
var prseqBin = seq.Substring(4, 4);
|
||||
userDataIndex += 1;
|
||||
|
||||
// (DA2 - 1) * 8 + DA1 = pn
|
||||
var da1Bin = hexStringList[userDataIndex].HexToBin();
|
||||
var da1 = da1Bin == "0" ? 0 : da1Bin.Length;
|
||||
userDataIndex += 1;
|
||||
var da2 = hexStringList[userDataIndex].HexToDec();
|
||||
var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1;
|
||||
userDataIndex += 1;
|
||||
//(DT2*8)+DT1=fn
|
||||
var dt1Bin = hexStringList[userDataIndex].HexToBin();
|
||||
var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0;
|
||||
userDataIndex += 1;
|
||||
var dt2 = hexStringList[userDataIndex].HexToDec();
|
||||
var fn = dt2 * 8 + dt1;
|
||||
userDataIndex += 1;
|
||||
|
||||
//数据单元
|
||||
var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList();
|
||||
|
||||
//EC
|
||||
//Tp
|
||||
commandReulst = new CommandReulst()
|
||||
{
|
||||
A = a,
|
||||
MSA = mSA,
|
||||
AFN = aFN,
|
||||
Seq = new Seq()
|
||||
{
|
||||
TpV = tpV,
|
||||
FIRFIN = fIRFIN,
|
||||
CON = cON,
|
||||
PRSEQ = prseqBin.BinToDec(),
|
||||
},
|
||||
CmdLength = len,
|
||||
Pn = pn,
|
||||
Fn = fn,
|
||||
HexDatas = datas
|
||||
};
|
||||
|
||||
return commandReulst;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 解析地址
|
||||
/// </summary>
|
||||
/// <param name="aHexList"></param>
|
||||
/// <returns></returns>
|
||||
private string AnalysisA(List<string> aHexList)
|
||||
{
|
||||
var a1 = aHexList[1] + aHexList[0];
|
||||
var a2 = aHexList[3] + aHexList[2];
|
||||
var a2Dec = a2.HexToDec();
|
||||
var a3 = aHexList[4];
|
||||
var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}";
|
||||
return a;
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,9 +77,32 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
/// 电表自动校时
|
||||
/// </summary>
|
||||
/// <param name="timeDensity">采集频率</param>
|
||||
/// <param name="ammeterInfo">电表信息</param>
|
||||
/// <param name="groupIndex">集中器所在分组</param>
|
||||
/// <param name="timestamps">采集频率对应的时间戳</param>
|
||||
/// <returns></returns>
|
||||
Task AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
|
||||
|
||||
/// <summary>
|
||||
/// 日冻结抄读
|
||||
/// </summary>
|
||||
/// <param name="timeDensity">采集频率</param>
|
||||
/// <param name="ammeterInfo">电表信息</param>
|
||||
/// <param name="groupIndex">集中器所在分组</param>
|
||||
/// <param name="timestamps">采集频率对应的时间戳</param>
|
||||
/// <returns></returns>
|
||||
Task AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
|
||||
|
||||
/// <summary>
|
||||
/// 月冻结数据抄读
|
||||
/// </summary>
|
||||
/// <param name="timeDensity">采集频率</param>
|
||||
/// <param name="ammeterInfo">电表信息</param>
|
||||
/// <param name="groupIndex">集中器所在分组</param>
|
||||
/// <param name="timestamps">采集频率对应的时间戳</param>
|
||||
/// <returns></returns>
|
||||
Task AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
|
||||
|
||||
#endregion
|
||||
|
||||
#region 水表采集处理
|
||||
|
||||
@ -149,8 +149,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticGetTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
|
||||
{
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
|
||||
{
|
||||
_ = CreateMeterPublishTask<AmmeterInfo>(
|
||||
timeDensity: timeDensity,
|
||||
nextTaskTime: currentTime,
|
||||
@ -160,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticGetTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
|
||||
{
|
||||
_ = CreateMeterPublishTask<AmmeterInfo>(
|
||||
timeDensity: timeDensity,
|
||||
@ -171,6 +171,28 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
|
||||
{
|
||||
_ = CreateMeterPublishTask<AmmeterInfo>(
|
||||
timeDensity: timeDensity,
|
||||
nextTaskTime: currentTime,
|
||||
meterType: MeterTypeEnum.Ammeter,
|
||||
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
|
||||
{
|
||||
await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
|
||||
{
|
||||
_ = CreateMeterPublishTask<AmmeterInfo>(
|
||||
timeDensity: timeDensity,
|
||||
nextTaskTime: currentTime,
|
||||
meterType: MeterTypeEnum.Ammeter,
|
||||
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
|
||||
{
|
||||
await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
|
||||
@ -236,12 +258,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
|
||||
tasksToBeIssueModel.LastTaskTime = currentTaskTime;
|
||||
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
|
||||
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
||||
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
||||
}
|
||||
|
||||
//电表定时阀控任务处理。
|
||||
_= AmmeterScheduledAutoValveControl();
|
||||
|
||||
_ = AmmeterScheduledAutoValveControl();
|
||||
|
||||
}
|
||||
|
||||
#region 电表采集处理
|
||||
@ -681,34 +703,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||
{
|
||||
SystemName = SystemType,
|
||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||
DeviceId = $"{ammeterInfo.MeterId}",
|
||||
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
|
||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||
PendingCopyReadTime = timestamps,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||
AFN = builderResponse.AFn,
|
||||
Fn = builderResponse.Fn,
|
||||
Seq = builderResponse.Seq,
|
||||
MSA = builderResponse.MSA,
|
||||
ItemCode = tempItem,
|
||||
TaskMark = taskMark,
|
||||
IsSend = false,
|
||||
ManualOrNot = false,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||
IsReceived = false,
|
||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||
};
|
||||
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: DateTimeOffset.Now.ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: tempItem,
|
||||
subItemCode: null,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
}
|
||||
|
||||
@ -750,12 +752,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
try
|
||||
{
|
||||
//判断是否是自动校时时间
|
||||
if (!string.Equals(currentTimeStr , _applicationOptions.AutomaticVerificationTime,StringComparison.CurrentCultureIgnoreCase))
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
{
|
||||
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||
|
||||
@ -783,33 +785,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
}
|
||||
});
|
||||
|
||||
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||
{
|
||||
SystemName = SystemType,
|
||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||
DeviceId = $"{ammeterInfo.MeterId}",
|
||||
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||
PendingCopyReadTime = currentTime,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||
AFN = builderResponse.AFn,
|
||||
Fn = builderResponse.Fn,
|
||||
Seq = builderResponse.Seq,
|
||||
MSA = builderResponse.MSA,
|
||||
ItemCode = itemCode,
|
||||
SubItemCode = subItemCode,
|
||||
TaskMark = taskMark,
|
||||
IsSend = false,
|
||||
ManualOrNot = false,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||
IsReceived = false,
|
||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||
};
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: itemCode,
|
||||
subItemCode: subItemCode,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
|
||||
if (taskList == null || taskList.Count <= 0)
|
||||
@ -839,7 +822,171 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 日冻结抄读
|
||||
/// </summary>
|
||||
/// <param name="timeDensity">采集频率</param>
|
||||
/// <param name="ammeterInfo">电表信息</param>
|
||||
/// <param name="groupIndex">集中器所在分组</param>
|
||||
/// <param name="timestamps">采集频率对应的时间戳</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
|
||||
{
|
||||
var currentTime = DateTime.Now;
|
||||
string currentTimeStr = $"{currentTime:HH:mm:00}";
|
||||
|
||||
try
|
||||
{
|
||||
//判断是否是自动校时时间
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
{
|
||||
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
|
||||
return;
|
||||
}
|
||||
|
||||
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||
|
||||
|
||||
//根据电表型号获取协议插件
|
||||
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
|
||||
if (protocolPlugin == null)
|
||||
{
|
||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var item in DayFreezeCodes)
|
||||
{
|
||||
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
|
||||
{
|
||||
FocusAddress = ammeterInfo.FocusAddress,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
ItemCode = item
|
||||
});
|
||||
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: item,
|
||||
subItemCode: null,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
}
|
||||
|
||||
|
||||
if (taskList == null || taskList.Count <= 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
|
||||
return;
|
||||
}
|
||||
|
||||
//任务记录入库
|
||||
await _dbProvider.BatchInsertAsync(metadata, taskList);
|
||||
|
||||
//任务信息推送Kafka
|
||||
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
||||
items: taskList,
|
||||
deviceIdSelector: data => data.DeviceId,
|
||||
processor: (data, groupIndex) =>
|
||||
{
|
||||
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 月冻结数据抄读
|
||||
/// </summary>
|
||||
/// <param name="timeDensity">采集频率</param>
|
||||
/// <param name="ammeterInfo">电表信息</param>
|
||||
/// <param name="groupIndex">集中器所在分组</param>
|
||||
/// <param name="timestamps">采集频率对应的时间戳</param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
|
||||
{
|
||||
var currentTime = DateTime.Now;
|
||||
string currentTimeStr = $"{currentTime:HH:mm:00}";
|
||||
|
||||
try
|
||||
{
|
||||
//判断是否是自动校时时间
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
{
|
||||
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
|
||||
return;
|
||||
}
|
||||
|
||||
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||
|
||||
|
||||
//根据电表型号获取协议插件
|
||||
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
|
||||
if (protocolPlugin == null)
|
||||
{
|
||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var item in DayFreezeCodes)
|
||||
{
|
||||
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
|
||||
{
|
||||
FocusAddress = ammeterInfo.FocusAddress,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
ItemCode = item
|
||||
});
|
||||
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: item,
|
||||
subItemCode: null,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
}
|
||||
|
||||
|
||||
if (taskList == null || taskList.Count <= 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
|
||||
return;
|
||||
}
|
||||
|
||||
//任务记录入库
|
||||
await _dbProvider.BatchInsertAsync(metadata, taskList);
|
||||
|
||||
//任务信息推送Kafka
|
||||
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
||||
items: taskList,
|
||||
deviceIdSelector: data => data.DeviceId,
|
||||
processor: (data, groupIndex) =>
|
||||
{
|
||||
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
|
||||
@ -1022,7 +1169,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
||||
|
||||
|
||||
|
||||
|
||||
//根据表型号获取协议插件
|
||||
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
|
||||
if (protocolPlugin == null)
|
||||
@ -1112,7 +1259,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
try
|
||||
{
|
||||
//判断是否是自动获取版本号时间
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticGetTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
{
|
||||
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTerminalVersion)} 集中器自动获取版本号,非自动处理时间");
|
||||
return;
|
||||
@ -1145,33 +1292,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
//}
|
||||
});
|
||||
|
||||
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||
{
|
||||
SystemName = SystemType,
|
||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||
DeviceId = $"{ammeterInfo.MeterId}",
|
||||
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||
PendingCopyReadTime = currentTime,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||
AFN = builderResponse.AFn,
|
||||
Fn = builderResponse.Fn,
|
||||
Seq = builderResponse.Seq,
|
||||
MSA = builderResponse.MSA,
|
||||
ItemCode = itemCode,
|
||||
//SubItemCode = subItemCode,
|
||||
TaskMark = taskMark,
|
||||
IsSend = false,
|
||||
ManualOrNot = false,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||
IsReceived = false,
|
||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||
};
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: itemCode,
|
||||
subItemCode: null,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
|
||||
if (taskList == null || taskList.Count <= 0)
|
||||
@ -1216,7 +1344,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
try
|
||||
{
|
||||
//判断是否是自动获取版本号时间
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticGetTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
|
||||
{
|
||||
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间");
|
||||
return;
|
||||
@ -1242,33 +1370,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
ItemCode = itemCode,
|
||||
});
|
||||
|
||||
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||
{
|
||||
SystemName = SystemType,
|
||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||
DeviceId = $"{ammeterInfo.MeterId}",
|
||||
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||
PendingCopyReadTime = currentTime,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||
AFN = builderResponse.AFn,
|
||||
Fn = builderResponse.Fn,
|
||||
Seq = builderResponse.Seq,
|
||||
MSA = builderResponse.MSA,
|
||||
ItemCode = itemCode,
|
||||
//SubItemCode = subItemCode,
|
||||
TaskMark = taskMark,
|
||||
IsSend = false,
|
||||
ManualOrNot = false,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||
IsReceived = false,
|
||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||
};
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: itemCode,
|
||||
subItemCode: null,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
|
||||
if (taskList == null || taskList.Count <= 0)
|
||||
@ -1442,6 +1551,47 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 构建报文保存对象
|
||||
/// </summary>
|
||||
/// <param name="ammeterInfo">电表信息</param>
|
||||
/// <param name="timestamps">IoTDB存储时标</param>
|
||||
/// <param name="builderResponse">报文构建返回结果</param>
|
||||
/// <param name="itemCode">端到云协议采集项编码</param>
|
||||
/// <param name="subItemCode">端到端采集项编码</param>
|
||||
/// <param name="pendingCopyReadTime">待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳</param>
|
||||
/// <param name="creationTime">数据创建时间戳</param>
|
||||
/// <returns></returns>
|
||||
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime)
|
||||
{
|
||||
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
||||
return new MeterReadingTelemetryPacketInfo()
|
||||
{
|
||||
SystemName = SystemType,
|
||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||
DeviceId = $"{ammeterInfo.MeterId}",
|
||||
Timestamps = timestamps,
|
||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||
PendingCopyReadTime = pendingCopyReadTime,
|
||||
CreationTime = creationTime,
|
||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||
AFN = builderResponse.AFn,
|
||||
Fn = builderResponse.Fn,
|
||||
Seq = builderResponse.Seq,
|
||||
MSA = builderResponse.MSA,
|
||||
ItemCode = itemCode,
|
||||
SubItemCode = subItemCode,
|
||||
TaskMark = taskMark,
|
||||
IsSend = false,
|
||||
ManualOrNot = false,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||
IsReceived = false,
|
||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||
};
|
||||
}
|
||||
#endregion
|
||||
|
||||
}
|
||||
|
||||
@ -274,33 +274,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
}
|
||||
});
|
||||
|
||||
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||
{
|
||||
SystemName = SystemType,
|
||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||
DeviceId = $"{ammeterInfo.MeterId}",
|
||||
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||
PendingCopyReadTime = currentTime,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||
AFN = builderResponse.AFn,
|
||||
Fn = builderResponse.Fn,
|
||||
Seq = builderResponse.Seq,
|
||||
MSA = builderResponse.MSA,
|
||||
ItemCode = itemCode,
|
||||
SubItemCode = subItemCode,
|
||||
TaskMark = taskMark,
|
||||
IsSend = false,
|
||||
ManualOrNot = false,
|
||||
Pn = ammeterInfo.MeteringCode,
|
||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||
IsReceived = false,
|
||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||
};
|
||||
|
||||
var meterReadingRecords = CreateAmmeterPacketInfo(
|
||||
ammeterInfo: ammeterInfo,
|
||||
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||
builderResponse: builderResponse,
|
||||
itemCode: itemCode,
|
||||
subItemCode: subItemCode,
|
||||
pendingCopyReadTime: currentTime,
|
||||
creationTime: currentTime);
|
||||
taskList.Add(meterReadingRecords);
|
||||
}
|
||||
if (taskList == null || taskList.Count <= 0)
|
||||
|
||||
@ -28,12 +28,22 @@
|
||||
/// <summary>
|
||||
/// 自动获取终端版时间
|
||||
/// </summary>
|
||||
public required string AutomaticGetTerminalVersionTime { get; set; }
|
||||
public required string AutomaticTerminalVersionTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 自动获取远程通信模块(SIM)版本时间
|
||||
/// </summary>
|
||||
public required string AutomaticGetTelematicsModuleTime { get; set; }
|
||||
public required string AutomaticTelematicsModuleTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 日冻结抄读时间
|
||||
/// </summary>
|
||||
public required string AutomaticDayFreezeTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 月冻结抄读时间
|
||||
/// </summary>
|
||||
public required string AutomaticMonthFreezeTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 默认协议插件
|
||||
|
||||
@ -146,8 +146,10 @@
|
||||
"SystemType": null,
|
||||
"FirstCollectionTime": "2025-04-22 16:07:00",
|
||||
"AutomaticVerificationTime": "16:07:00",
|
||||
"AutomaticGetTerminalVersionTime": "17:07:00",
|
||||
"AutomaticGetTelematicsModuleTime": "17:30:00",
|
||||
"AutomaticTerminalVersionTime": "17:07:00",
|
||||
"AutomaticTelematicsModuleTime": "17:30:00",
|
||||
"AutomaticDayFreezeTime": "02:30:00",
|
||||
"AutomaticMonthFreezeTime": "03:30:00",
|
||||
"DefaultProtocolPlugin": "T37612012ProtocolPlugin"
|
||||
},
|
||||
"PlugInFolder": ""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user