Compare commits
No commits in common. "0328aa08d909c5ce468ed52ddc03dcf13e4f24f7" and "1ae875606f75721ea8440936f99272dc03b2243e" have entirely different histories.
0328aa08d9
...
1ae875606f
@ -1,9 +1,7 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using Confluent.Kafka.Admin;
|
using Confluent.Kafka.Admin;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.AdminClient;
|
namespace JiShe.CollectBus.Kafka.AdminClient;
|
||||||
@ -11,17 +9,16 @@ namespace JiShe.CollectBus.Kafka.AdminClient;
|
|||||||
public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency
|
public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency
|
||||||
{
|
{
|
||||||
private readonly ILogger<AdminClientService> _logger;
|
private readonly ILogger<AdminClientService> _logger;
|
||||||
private readonly KafkaOptionConfig _kafkaOptionConfig;
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="AdminClientService" /> class.
|
/// Initializes a new instance of the <see cref="AdminClientService" /> class.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="configuration"></param>
|
/// <param name="configuration"></param>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
|
public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_kafkaOptionConfig = kafkaOptionConfig.Value;
|
Instance = GetInstance(configuration);
|
||||||
Instance = GetInstance();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -145,19 +142,22 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets the instance.
|
/// Gets the instance.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <param name="configuration">The configuration.</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public IAdminClient GetInstance()
|
public IAdminClient GetInstance(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
|
||||||
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
|
||||||
var adminClientConfig = new AdminClientConfig
|
var adminClientConfig = new AdminClientConfig
|
||||||
{
|
{
|
||||||
BootstrapServers = _kafkaOptionConfig.BootstrapServers
|
BootstrapServers = configuration["Kafka:BootstrapServers"]
|
||||||
};
|
};
|
||||||
if (_kafkaOptionConfig.EnableAuthorization)
|
if (enableAuthorization)
|
||||||
{
|
{
|
||||||
adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
||||||
adminClientConfig.SaslMechanism = SaslMechanism.Plain;
|
adminClientConfig.SaslMechanism = SaslMechanism.Plain;
|
||||||
adminClientConfig.SaslUsername = _kafkaOptionConfig.SaslUserName;
|
adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
||||||
adminClientConfig.SaslPassword = _kafkaOptionConfig.SaslUserName;
|
adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
||||||
}
|
}
|
||||||
return new AdminClientBuilder(adminClientConfig).Build();
|
return new AdminClientBuilder(adminClientConfig).Build();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,8 +34,7 @@ namespace JiShe.CollectBus.Kafka.Internal
|
|||||||
ErrorCode.NotCoordinatorForGroup,
|
ErrorCode.NotCoordinatorForGroup,
|
||||||
ErrorCode.NetworkException,
|
ErrorCode.NetworkException,
|
||||||
ErrorCode.GroupCoordinatorNotAvailable,
|
ErrorCode.GroupCoordinatorNotAvailable,
|
||||||
ErrorCode.InvalidGroupId,
|
ErrorCode.InvalidGroupId
|
||||||
ErrorCode.IllegalGeneration
|
|
||||||
};
|
};
|
||||||
return ex switch
|
return ex switch
|
||||||
{
|
{
|
||||||
|
|||||||
@ -95,7 +95,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
DeviceId = $"{data.DeviceId}",
|
DeviceId = $"{data.DeviceId}",
|
||||||
DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}",
|
DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}",
|
||||||
ProjectId = $"{data.ProjectId}",
|
ProjectId = $"{data.ProjectId}",
|
||||||
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default)
|
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default)
|
||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
||||||
@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
DeviceId = $"{item.DeviceId}",
|
DeviceId = $"{item.DeviceId}",
|
||||||
DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}",
|
DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}",
|
||||||
ProjectId = $"{item.ProjectId}",
|
ProjectId = $"{item.ProjectId}",
|
||||||
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整
|
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整
|
||||||
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default)
|
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default)
|
||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
|
||||||
@ -275,7 +275,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
if (!data.TimeSpan.HasValue)
|
if (!data.TimeSpan.HasValue)
|
||||||
data.TimeSpan = analysisBaseDto.ReceivedTime;
|
data.TimeSpan = analysisBaseDto.ReceivedTime;
|
||||||
// 类型(心跳,登录,上电,掉电)
|
// 类型(心跳,登录,上电,掉电)
|
||||||
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeMilliseconds();
|
||||||
var treeData = new TreeModelSingleMeasuringEntity<T>()
|
var treeData = new TreeModelSingleMeasuringEntity<T>()
|
||||||
{
|
{
|
||||||
SystemName = _applicationOptions.SystemType,
|
SystemName = _applicationOptions.SystemType,
|
||||||
@ -287,20 +287,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
||||||
await _dbProvider.InsertAsync(treeData);
|
await _dbProvider.InsertAsync(treeData);
|
||||||
// 数据帧
|
|
||||||
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
|
|
||||||
{
|
|
||||||
SystemName = _applicationOptions.SystemType,
|
|
||||||
DeviceId = $"{data.DeviceId}",
|
|
||||||
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
|
|
||||||
ProjectId = $"{data.ProjectId}",
|
|
||||||
Timestamps = timestamps,
|
|
||||||
SingleMeasuring = new Tuple<string, string>(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty)
|
|
||||||
};
|
|
||||||
|
|
||||||
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
|
||||||
await _dbProvider.InsertAsync(treeFrameData);
|
|
||||||
|
|
||||||
// 时间
|
// 时间
|
||||||
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<long>()
|
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<long>()
|
||||||
{
|
{
|
||||||
@ -309,7 +295,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
|
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
|
||||||
ProjectId = $"{data.ProjectId}",
|
ProjectId = $"{data.ProjectId}",
|
||||||
Timestamps = timestamps,
|
Timestamps = timestamps,
|
||||||
SingleMeasuring = new Tuple<string, long>(ConcentratorStatusFieldConst.RecordingTime, (data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now).GetDateTimeOffset().ToUnixTimeNanoseconds())
|
SingleMeasuring = new Tuple<string, long>(ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan!.Value.GetDateTimeOffset().ToUnixTimeNanoseconds())
|
||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
||||||
await _dbProvider.InsertAsync(treeRecordingTimeData);
|
await _dbProvider.InsertAsync(treeRecordingTimeData);
|
||||||
@ -325,7 +311,19 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
|||||||
};
|
};
|
||||||
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
||||||
await _dbProvider.InsertAsync(treeRemarkData);
|
await _dbProvider.InsertAsync(treeRemarkData);
|
||||||
|
// 数据帧
|
||||||
|
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
|
||||||
|
{
|
||||||
|
SystemName = _applicationOptions.SystemType,
|
||||||
|
DeviceId = $"{data.DeviceId}",
|
||||||
|
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
|
||||||
|
ProjectId = $"{data.ProjectId}",
|
||||||
|
Timestamps = timestamps,
|
||||||
|
SingleMeasuring = new Tuple<string, string>(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty )
|
||||||
|
};
|
||||||
|
|
||||||
|
_runtimeContext.UseTableSessionPool = false; // 使树模型池
|
||||||
|
await _dbProvider.InsertAsync(treeFrameData);
|
||||||
return await Task.FromResult(true);
|
return await Task.FromResult(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -170,7 +170,6 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
// await _deviceRepository.UpdateAsync(entity);
|
// await _deviceRepository.UpdateAsync(entity);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
|
||||||
var messageReceivedLoginEvent = new MessageReceivedLogin
|
var messageReceivedLoginEvent = new MessageReceivedLogin
|
||||||
{
|
{
|
||||||
ClientId = code,
|
ClientId = code,
|
||||||
@ -181,8 +180,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
MessageId = Guid.NewGuid().ToString(),
|
MessageId = Guid.NewGuid().ToString(),
|
||||||
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
||||||
};
|
};
|
||||||
//await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
||||||
|
|
||||||
var reqParam = new ReqParameter2
|
var reqParam = new ReqParameter2
|
||||||
{
|
{
|
||||||
AFN = AFN.确认或否认,
|
AFN = AFN.确认或否认,
|
||||||
@ -213,7 +211,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
{
|
{
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
|
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
|
||||||
// await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -268,7 +266,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
MessageId = Guid.NewGuid().ToString(),
|
MessageId = Guid.NewGuid().ToString(),
|
||||||
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
|
||||||
};
|
};
|
||||||
// await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
||||||
|
|
||||||
var reqParam = new ReqParameter2()
|
var reqParam = new ReqParameter2()
|
||||||
{
|
{
|
||||||
@ -301,7 +299,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
{
|
{
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
|
||||||
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
|
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
|
||||||
// await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -38,7 +38,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 电表自动阀控下行消息消费订阅
|
/// 电表自动阀控下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region 水表消息采集
|
#region 水表消息采集
|
||||||
|
|||||||
@ -62,19 +62,21 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
|
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
|
||||||
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
{
|
{
|
||||||
bool isAck = true;
|
bool isAck = false;
|
||||||
foreach (var issuedEventMessage in issuedEventMessages)
|
foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
{
|
{
|
||||||
//var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
//if (loginEntity == null)
|
if (loginEntity == null)
|
||||||
//{
|
{
|
||||||
// isAck=false;
|
isAck=false;
|
||||||
// break;
|
break;
|
||||||
//}
|
}
|
||||||
|
|
||||||
|
loginEntity.AckTime = Clock.Now;
|
||||||
|
loginEntity.IsAck = true;
|
||||||
|
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
|
isAck = true;
|
||||||
|
|
||||||
//loginEntity.AckTime = Clock.Now;
|
|
||||||
//loginEntity.IsAck = true;
|
|
||||||
//await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
|
||||||
}
|
}
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
||||||
@ -83,19 +85,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
||||||
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
{
|
{
|
||||||
bool isAck = true;
|
bool isAck = false;
|
||||||
//foreach (var issuedEventMessage in issuedEventMessages)
|
foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
//{
|
{
|
||||||
// var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
// if (heartbeatEntity == null)
|
if (heartbeatEntity == null)
|
||||||
// {
|
{
|
||||||
// isAck = false;
|
isAck = false;
|
||||||
// break;
|
break;
|
||||||
// }
|
}
|
||||||
// heartbeatEntity.AckTime = Clock.Now;
|
heartbeatEntity.AckTime = Clock.Now;
|
||||||
// heartbeatEntity.IsAck = true;
|
heartbeatEntity.IsAck = true;
|
||||||
// await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
// }
|
}
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,7 +5,6 @@ using JiShe.CollectBus.Kafka.Attributes;
|
|||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using TouchSocket.Sockets;
|
using TouchSocket.Sockets;
|
||||||
|
|
||||||
@ -80,14 +79,12 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName,EnableBatch =true,TaskCount=30,BatchSize =500)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> receivedMessage)
|
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
{
|
{
|
||||||
//todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。
|
//todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。
|
||||||
//_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
|
_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
|
||||||
_logger.LogWarning($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
|
return await SendMessagesAsync(receivedMessage);
|
||||||
return SubscribeAck.Success();
|
|
||||||
//return await SendMessagesAsync(receivedMessage);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||||
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" />
|
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" />
|
||||||
<title>后端服务</title>
|
<title>后端服务</title>
|
||||||
|
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user