From cdcf078e5a7b02806ca2c8aefbfd1972db2fda87 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Wed, 7 May 2025 15:17:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E8=B4=B9=E6=B5=81?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AnalysisData/DataStorage.cs | 6 +-- .../T37612012ProtocolPlugin.cs | 10 ++-- .../Subscribers/SubscriberAppService.cs | 48 +++++++++---------- .../Subscribers/WorkerSubscriberAppService.cs | 3 +- 4 files changed, 34 insertions(+), 33 deletions(-) diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs index 06ab962..67e1794 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs @@ -95,7 +95,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DeviceId = $"{data.DeviceId}", DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}", ProjectId = $"{data.ProjectId}", - Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), SingleMeasuring = new Tuple(data.FiledName ?? string.Empty, data.DataValue ?? default) }; _runtimeContext.UseTableSessionPool = true; // 使用表模型池 @@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DeviceId = $"{item.DeviceId}", DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}", ProjectId = $"{item.ProjectId}", - Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整 + Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整 SingleMeasuring = new Tuple(item.FiledName ?? string.Empty, item.DataValue ?? default) }; _runtimeContext.UseTableSessionPool = true; // 使用表模型池 @@ -275,7 +275,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (!data.TimeSpan.HasValue) data.TimeSpan = analysisBaseDto.ReceivedTime; // 类型(心跳,登录,上电,掉电) - long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeMilliseconds(); + long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(); var treeData = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs index b8568d4..9b35261 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs @@ -170,6 +170,7 @@ namespace JiShe.CollectBus.Protocol.T37612012 // await _deviceRepository.UpdateAsync(entity); //} + var messageReceivedLoginEvent = new MessageReceivedLogin { ClientId = code, @@ -180,7 +181,8 @@ namespace JiShe.CollectBus.Protocol.T37612012 MessageId = Guid.NewGuid().ToString(), ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") }; - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); + //await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); + var reqParam = new ReqParameter2 { AFN = AFN.确认或否认, @@ -211,7 +213,7 @@ namespace JiShe.CollectBus.Protocol.T37612012 { await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}"); - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); + // await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); } } @@ -266,7 +268,7 @@ namespace JiShe.CollectBus.Protocol.T37612012 MessageId = Guid.NewGuid().ToString(), ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") }; - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); + // await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); var reqParam = new ReqParameter2() { @@ -299,7 +301,7 @@ namespace JiShe.CollectBus.Protocol.T37612012 { await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes); _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}"); - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage); + // await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage); } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index a082551..670fd7c 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -62,21 +62,19 @@ namespace JiShe.CollectBus.Subscribers [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] public async Task LoginIssuedEvent(List issuedEventMessages) { - bool isAck = false; + bool isAck = true; foreach (var issuedEventMessage in issuedEventMessages) { - var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); - if (loginEntity == null) - { - isAck=false; - break; - } + //var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); + //if (loginEntity == null) + //{ + // isAck=false; + // break; + //} - loginEntity.AckTime = Clock.Now; - loginEntity.IsAck = true; - await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - isAck = true; - + //loginEntity.AckTime = Clock.Now; + //loginEntity.IsAck = true; + //await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); @@ -85,19 +83,19 @@ namespace JiShe.CollectBus.Subscribers [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] public async Task HeartbeatIssuedEvent(List issuedEventMessages) { - bool isAck = false; - foreach (var issuedEventMessage in issuedEventMessages) - { - var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); - if (heartbeatEntity == null) - { - isAck = false; - break; - } - heartbeatEntity.AckTime = Clock.Now; - heartbeatEntity.IsAck = true; - await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - } + bool isAck = true; + //foreach (var issuedEventMessage in issuedEventMessages) + //{ + // var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); + // if (heartbeatEntity == null) + // { + // isAck = false; + // break; + // } + // heartbeatEntity.AckTime = Clock.Now; + // heartbeatEntity.IsAck = true; + // await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); + // } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index dc49f57..74ed99c 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -67,10 +67,11 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - //[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); + return await SendMessagesAsync(receivedMessage); }