优化消费流程
This commit is contained in:
parent
1ae875606f
commit
cdcf078e5a
@ -95,7 +95,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
DeviceId = $"{data.DeviceId}",
|
DeviceId = $"{data.DeviceId}",
|
||||||
DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}",
|
DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}",
|
||||||
ProjectId = $"{data.ProjectId}",
|
ProjectId = $"{data.ProjectId}",
|
||||||
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||||
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default)
|
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default)
|
||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
||||||
@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
DeviceId = $"{item.DeviceId}",
|
DeviceId = $"{item.DeviceId}",
|
||||||
DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}",
|
DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}",
|
||||||
ProjectId = $"{item.ProjectId}",
|
ProjectId = $"{item.ProjectId}",
|
||||||
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整
|
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整
|
||||||
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default)
|
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default)
|
||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
||||||
@ -275,7 +275,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
if (!data.TimeSpan.HasValue)
|
if (!data.TimeSpan.HasValue)
|
||||||
data.TimeSpan = analysisBaseDto.ReceivedTime;
|
data.TimeSpan = analysisBaseDto.ReceivedTime;
|
||||||
// 类型(心跳,登录,上电,掉电)
|
// 类型(心跳,登录,上电,掉电)
|
||||||
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeMilliseconds();
|
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
||||||
var treeData = new TreeModelSingleMeasuringEntity<T>()
|
var treeData = new TreeModelSingleMeasuringEntity<T>()
|
||||||
{
|
{
|
||||||
SystemName = _applicationOptions.SystemType,
|
SystemName = _applicationOptions.SystemType,
|
||||||
|
|||||||
@ -170,6 +170,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
// await _deviceRepository.UpdateAsync(entity);
|
// await _deviceRepository.UpdateAsync(entity);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
|
||||||
var messageReceivedLoginEvent = new MessageReceivedLogin
|
var messageReceivedLoginEvent = new MessageReceivedLogin
|
||||||
{
|
{
|
||||||
ClientId = code,
|
ClientId = code,
|
||||||
@ -180,7 +181,8 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
MessageId = Guid.NewGuid().ToString(),
|
MessageId = Guid.NewGuid().ToString(),
|
||||||
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
||||||
};
|
};
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
//await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
||||||
|
|
||||||
var reqParam = new ReqParameter2
|
var reqParam = new ReqParameter2
|
||||||
{
|
{
|
||||||
AFN = AFN.确认或否认,
|
AFN = AFN.确认或否认,
|
||||||
@ -211,7 +213,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
{
|
{
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
|
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
|
// await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,7 +268,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
MessageId = Guid.NewGuid().ToString(),
|
MessageId = Guid.NewGuid().ToString(),
|
||||||
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
||||||
};
|
};
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
// await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
||||||
|
|
||||||
var reqParam = new ReqParameter2()
|
var reqParam = new ReqParameter2()
|
||||||
{
|
{
|
||||||
@ -299,7 +301,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
{
|
{
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
|
||||||
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
|
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
|
// await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -62,21 +62,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
|
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
|
||||||
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
bool isAck = true;
|
||||||
foreach (var issuedEventMessage in issuedEventMessages)
|
foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
{
|
{
|
||||||
var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
//var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
if (loginEntity == null)
|
//if (loginEntity == null)
|
||||||
{
|
//{
|
||||||
isAck=false;
|
// isAck=false;
|
||||||
break;
|
// break;
|
||||||
}
|
//}
|
||||||
|
|
||||||
loginEntity.AckTime = Clock.Now;
|
|
||||||
loginEntity.IsAck = true;
|
|
||||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
|
||||||
isAck = true;
|
|
||||||
|
|
||||||
|
//loginEntity.AckTime = Clock.Now;
|
||||||
|
//loginEntity.IsAck = true;
|
||||||
|
//await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
}
|
}
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
||||||
@ -85,19 +83,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
||||||
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
bool isAck = true;
|
||||||
foreach (var issuedEventMessage in issuedEventMessages)
|
//foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
{
|
//{
|
||||||
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
// var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
if (heartbeatEntity == null)
|
// if (heartbeatEntity == null)
|
||||||
{
|
// {
|
||||||
isAck = false;
|
// isAck = false;
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
heartbeatEntity.AckTime = Clock.Now;
|
// heartbeatEntity.AckTime = Clock.Now;
|
||||||
heartbeatEntity.IsAck = true;
|
// heartbeatEntity.IsAck = true;
|
||||||
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
// await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
}
|
// }
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -67,10 +67,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
//[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||||
|
|
||||||
return await SendMessagesAsync(receivedMessage);
|
return await SendMessagesAsync(receivedMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user