using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Protocol376Simulator.Interfaces;
using Protocol376Simulator.Models;
using Protocol376Simulator.Services;
using Serilog;
namespace Protocol376Simulator.Simulators
{
///
/// 集中器模拟器类
///
public class ConcentratorSimulator : ISimulator
{
public readonly string _concentratorAddress;
private readonly NetworkService _networkService;
private readonly HeartbeatService _heartbeatService;
private readonly ReconnectionService _reconnectionService;
private readonly StatisticsService _statisticsService;
private readonly DeviceDataService _deviceDataService;
private bool _autoResponse = true;
private DateTime _lastLoginTime = DateTime.MinValue;
private bool _loginConfirmed = false;
///
/// 当接收到消息时触发
///
public event EventHandler MessageReceived;
///
/// 当状态变更时触发
///
public event EventHandler StatusChanged;
///
/// 是否已连接
///
public bool IsConnected => _networkService.IsConnected;
///
/// 是否已登录
///
public bool IsLoggedIn => _lastLoginTime > DateTime.MinValue && _loginConfirmed;
///
/// 阀门状态
///
public bool ValveStatus => _deviceDataService.ValveStatus;
///
/// 成功发送的心跳次数
///
public int SuccessfulHeartbeats => _heartbeatService.SuccessfulHeartbeats;
///
/// 最后登录时间
///
public DateTime LastLoginTime => _lastLoginTime;
///
/// 是否启用自动重连
///
public bool AutoReconnect
{
get => _reconnectionService.AutoReconnect;
set => _reconnectionService.AutoReconnect = value;
}
///
/// 重连尝试次数
///
public int ReconnectAttempts => _reconnectionService.ReconnectAttempts;
///
/// 是否正在重连
///
public bool IsReconnecting => _reconnectionService.IsReconnecting;
///
/// 构造函数
///
/// 集中器地址
/// 服务器地址
/// 服务器端口
public ConcentratorSimulator(string concentratorAddress, string serverAddress, int serverPort)
{
_concentratorAddress = concentratorAddress;
// 初始化网络服务
_networkService = new NetworkService(serverAddress, serverPort, $"集中器({concentratorAddress})");
_networkService.MessageReceived += OnMessageReceived;
_networkService.ConnectionStatusChanged += OnConnectionStatusChanged;
_networkService.ErrorOccurred += OnNetworkError;
// 初始化心跳服务
_heartbeatService = new HeartbeatService($"集中器({concentratorAddress})", SendHeartbeatMessageAsync);
// 初始化重连服务
_reconnectionService = new ReconnectionService(_networkService, $"集中器({concentratorAddress})");
_reconnectionService.ReconnectAttemptCompleted += OnReconnectAttemptCompleted;
// 初始化统计服务
_statisticsService = new StatisticsService($"集中器({concentratorAddress})");
// 初始化设备数据服务
_deviceDataService = new DeviceDataService($"集中器({concentratorAddress})");
_deviceDataService.ValveStatusChanged += OnValveStatusChanged;
}
///
/// 启动模拟器
///
/// 是否自动登录
/// 是否自动发送心跳
public async Task StartAsync(bool autoLogin = false, bool autoHeartbeat = false)
{
try
{
Log.Information("集中器 (地址: {Address}) 正在启动...", _concentratorAddress);
// 连接到服务器
await _networkService.ConnectAsync();
// 如果启用自动登录,发送登录消息
if (autoLogin)
{
// 短暂延迟,确保连接稳定
await Task.Delay(100);
await SendLoginMessageAsync();
Log.Information("集中器 (地址: {Address}) 自动登录已发送", _concentratorAddress);
}
// 如果启用自动心跳,启动心跳任务
if (autoHeartbeat)
{
StartHeartbeat();
}
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 启动失败: {ErrorMessage}", _concentratorAddress, ex.Message);
StatusChanged?.Invoke(this, $"启动失败: {ex.Message}");
// 异常继续抛出,由调用者处理
throw;
}
}
///
/// 停止模拟器
///
public async Task StopAsync()
{
try
{
// 停止心跳服务
_heartbeatService.Stop();
// 断开网络连接
await _networkService.DisconnectAsync();
Log.Information("集中器 (地址: {Address}) 已停止", _concentratorAddress);
StatusChanged?.Invoke(this, "已停止");
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 停止时发生错误: {ErrorMessage}",
_concentratorAddress, ex.Message);
}
}
///
/// 发送登录消息
///
public async Task SendLoginMessageAsync()
{
if (!IsConnected)
{
Log.Warning("集中器 (地址: {Address}) 未连接,无法发送登录消息", _concentratorAddress);
return;
}
try
{
var loginMessage = Protocol376Message.CreateLoginMessage(_concentratorAddress);
await SendMessageAsync(loginMessage);
_lastLoginTime = DateTime.Now;
StatusChanged?.Invoke(this, "已发送登录请求");
Log.Information("集中器 (地址: {Address}) 发送登录消息", _concentratorAddress);
Log.Debug("集中器 (地址: {Address}) A&C报文详情: {MessageInfo}",
_concentratorAddress, loginMessage.GetMessageInfo());
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 发送登录消息失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError("登录消息发送失败");
throw;
}
}
///
/// 发送心跳消息
///
public async Task SendHeartbeatMessageAsync()
{
if (!IsConnected)
{
Log.Warning("集中器 (地址: {Address}) 未连接,无法发送心跳消息", _concentratorAddress);
return;
}
try
{
var heartbeatMessage = Protocol376Message.CreateHeartbeatMessage(_concentratorAddress);
await SendMessageAsync(heartbeatMessage);
Log.Information("集中器 (地址: {Address}) 发送心跳消息", _concentratorAddress);
Log.Debug("集中器 (地址: {Address}) 心跳报文详情: {MessageInfo}",
_concentratorAddress, heartbeatMessage.GetMessageInfo());
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 发送心跳消息失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError("心跳消息发送失败");
throw;
}
}
///
/// 发送阀控操作消息
///
/// 阀门操作:1=开阀,2=关阀,3=查询状态
public async Task SendValveControlMessageAsync(byte valveOperation)
{
if (!IsConnected)
{
Log.Warning("集中器 (地址: {Address}) 未连接,无法发送阀控消息", _concentratorAddress);
return;
}
try
{
var valveMessage = Protocol376Message.CreateValveControlMessage(_concentratorAddress, valveOperation);
await SendMessageAsync(valveMessage);
Log.Information("集中器 (地址: {Address}) 发送阀控消息, 操作: {Operation}",
_concentratorAddress, valveOperation);
Log.Debug("集中器 (地址: {Address}) 阀控报文详情: {MessageInfo}",
_concentratorAddress, valveMessage.GetMessageInfo());
// 如果是开阀或关阀操作,更新阀门状态
if (valveOperation == 1)
{
_deviceDataService.UpdateValveStatus(true);
}
else if (valveOperation == 2)
{
_deviceDataService.UpdateValveStatus(false);
}
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 发送阀控消息失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError("阀控消息发送失败");
throw;
}
}
///
/// 发送数据上传消息
///
/// 数据类型
public async Task SendDataUploadMessageAsync(byte dataType)
{
if (!IsConnected)
{
Log.Warning("集中器 (地址: {Address}) 未连接,无法发送数据上传消息", _concentratorAddress);
return;
}
try
{
// 获取表计数据
byte[] meterData = _deviceDataService.GetMeterData(dataType);
if (meterData.Length == 0)
{
Log.Warning("集中器 (地址: {Address}) 数据类型 {DataType} 不存在表计数据",
_concentratorAddress, dataType);
return;
}
var dataMessage = Protocol376Message.CreateDataUploadMessage(_concentratorAddress, meterData);
await SendMessageAsync(dataMessage);
Log.Information("集中器 (地址: {Address}) 发送数据上传消息, 类型: {DataType}",
_concentratorAddress, dataType);
Log.Debug("集中器 (地址: {Address}) 上传报文详情: {MessageInfo}",
_concentratorAddress, dataMessage.GetMessageInfo());
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 发送数据上传消息失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError("数据上传消息发送失败");
throw;
}
}
///
/// 发送读数据消息
///
/// 数据类型
public async Task SendReadDataMessageAsync(byte dataType)
{
if (!IsConnected)
{
Log.Warning("集中器 (地址: {Address}) 未连接,无法发送读数据消息", _concentratorAddress);
return;
}
try
{
var readMessage = Protocol376Message.CreateReadDataMessage(_concentratorAddress, dataType);
await SendMessageAsync(readMessage);
Log.Information("集中器 (地址: {Address}) 发送读数据消息, 类型: {DataType}",
_concentratorAddress, dataType);
Log.Debug("集中器 (地址: {Address}) 读数据报文详情: {MessageInfo}",
_concentratorAddress, readMessage.GetMessageInfo());
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 发送读数据消息失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError("读数据消息发送失败");
throw;
}
}
///
/// 发送设置参数消息
///
/// 参数类型
/// 参数数据
public async Task SendSetParameterMessageAsync(byte paramType, byte[] paramData)
{
if (!IsConnected)
{
Log.Warning("集中器 (地址: {Address}) 未连接,无法发送设置参数消息", _concentratorAddress);
return;
}
try
{
var paramMessage = Protocol376Message.CreateSetParameterMessage(_concentratorAddress, paramType, paramData);
await SendMessageAsync(paramMessage);
Log.Information("集中器 (地址: {Address}) 发送设置参数消息, 类型: {ParamType}",
_concentratorAddress, paramType);
Log.Debug("集中器 (地址: {Address}) 参数设置报文详情: {MessageInfo}",
_concentratorAddress, paramMessage.GetMessageInfo());
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 发送设置参数消息失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError("设置参数消息发送失败");
throw;
}
}
///
/// 发送消息的通用方法
///
/// 要发送的消息
private async Task SendMessageAsync(IProtocolMessage message)
{
if (!IsConnected)
{
throw new InvalidOperationException("未连接到服务器,无法发送消息");
}
byte[] messageBytes = message.ToBytes();
await _networkService.SendMessageAsync(messageBytes);
// 记录统计信息
_statisticsService.RecordMessageSent();
_statisticsService.RecordMessageType(message.Type);
}
///
/// 启动心跳
///
public void StartHeartbeat()
{
_heartbeatService.Start();
StatusChanged?.Invoke(this, "心跳已启动");
Log.Information("集中器 (地址: {Address}) 自动心跳已启动", _concentratorAddress);
}
///
/// 停止心跳
///
public void StopHeartbeat()
{
_heartbeatService.Stop();
StatusChanged?.Invoke(this, "心跳已停止");
Log.Information("集中器 (地址: {Address}) 自动心跳已停止", _concentratorAddress);
}
///
/// 设置是否自动响应
///
/// 是否启用自动响应
public void SetAutoResponse(bool enabled)
{
_autoResponse = enabled;
Log.Information("集中器 (地址: {Address}) 自动响应已{Status}",
_concentratorAddress, enabled ? "启用" : "禁用");
}
///
/// 更新表计数据
///
public void UpdateMeterData(byte dataType, byte[] data)
{
_deviceDataService.SetMeterData(dataType, data);
}
///
/// 获取模拟器状态
///
public string GetStatus()
{
var status = new StringBuilder();
status.AppendLine($"集中器 (地址: {_concentratorAddress}) 状态:");
status.AppendLine($"连接状态: {(IsConnected ? "已连接" : "未连接")}");
status.AppendLine($"登录状态: {(IsLoggedIn ? "已登录" : "未登录")}");
if (IsLoggedIn)
{
status.AppendLine($"登录时间: {_lastLoginTime}");
}
status.AppendLine($"心跳状态: {(_heartbeatService.IsRunning ? "运行中" : "已停止")}");
status.AppendLine($"心跳次数: {_heartbeatService.SuccessfulHeartbeats}");
status.AppendLine($"阀门状态: {(_deviceDataService.ValveStatus ? "开启" : "关闭")}");
return status.ToString();
}
///
/// 获取通信统计信息
///
public string GetCommunicationStatistics()
{
return _statisticsService.GetStatisticsReport();
}
///
/// 设置重连参数
///
public void SetReconnectParameters(bool autoReconnect, int maxAttempts, int delaySeconds)
{
_reconnectionService.SetReconnectParameters(autoReconnect, maxAttempts, delaySeconds);
}
///
/// 接收消息处理
///
private void OnMessageReceived(object sender, byte[] message)
{
try
{
// 触发接收消息事件
MessageReceived?.Invoke(this, message);
// 处理接收到的消息
ProcessReceivedMessage(message);
// 统计信息记录
_statisticsService.RecordMessageReceived();
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 处理接收消息时发生错误: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError($"消息处理错误: {ex.Message}");
}
}
///
/// 处理接收到的消息
///
private void ProcessReceivedMessage(byte[] message)
{
try
{
// 解析消息
var receivedMessage = Protocol376Message.ParseFromBytes(message);
// 检查是否是登录确认消息
CheckLoginConfirmation(receivedMessage);
// 如果启用了自动响应,处理自动响应
if (_autoResponse)
{
_ = HandleAutoResponse(receivedMessage);
}
Log.Debug("集中器 (地址: {Address}) 收到消息: {MessageInfo}",
_concentratorAddress, receivedMessage.GetMessageInfo());
}
catch (Exception ex)
{
Log.Warning("集中器 (地址: {Address}) 消息解析失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
}
}
///
/// 检查是否是登录确认消息
///
private void CheckLoginConfirmation(Protocol376Message message)
{
// 检查是否是响应类型消息,且AFN为登录响应
if (message.Type == MessageType.Response && message.Data.Length >= 4 && message.Data[0] == 0x00)
{
// 检查是否包含数据单元标识
if (message.Data[1] == 0x02 && message.Data[2] == 0x70)
{
// 检查结果码
if (message.Data[3] == 0x00)
{
_loginConfirmed = true;
Log.Information("集中器 (地址: {Address}) 登录确认成功", _concentratorAddress);
StatusChanged?.Invoke(this, "登录成功");
}
else
{
_loginConfirmed = false;
Log.Warning("集中器 (地址: {Address}) 登录确认失败, 结果码: {ResultCode}",
_concentratorAddress, message.Data[3]);
StatusChanged?.Invoke(this, $"登录失败, 结果码: {message.Data[3]}");
}
}
}
}
///
/// 处理自动响应
///
private async Task HandleAutoResponse(Protocol376Message receivedMessage)
{
try
{
// 根据接收到的消息类型,生成对应的响应消息
Protocol376Message responseMessage = null;
switch (receivedMessage.Type)
{
case MessageType.Login:
// 登录请求的响应
byte[] loginResponseData = new byte[] { 0x00, 0x02, 0x70, 0x00 }; // AFN=0, 成功
responseMessage = new Protocol376Message
{
ControlCode = 0x00,
Address = receivedMessage.Address,
Data = loginResponseData,
Type = MessageType.Response
};
break;
case MessageType.Heartbeat:
// 心跳消息的响应
byte[] heartbeatResponseData = new byte[] { 0x00, 0x02, 0x70, 0x00 }; // AFN=0, 成功
responseMessage = new Protocol376Message
{
ControlCode = 0x00,
Address = receivedMessage.Address,
Data = heartbeatResponseData,
Type = MessageType.Response
};
break;
case MessageType.ValveControl:
// 阀控操作的响应
byte[] valveResponseData = new byte[] { 0x00, 0x02, 0x70, 0x00 }; // AFN=0, 成功
responseMessage = new Protocol376Message
{
ControlCode = 0x00,
Address = receivedMessage.Address,
Data = valveResponseData,
Type = MessageType.Response
};
break;
case MessageType.DataUpload:
// 数据上传的响应
byte[] dataUploadResponseData = new byte[] { 0x00, 0x02, 0x70, 0x00 }; // AFN=0, 成功
responseMessage = new Protocol376Message
{
ControlCode = 0x00,
Address = receivedMessage.Address,
Data = dataUploadResponseData,
Type = MessageType.Response
};
break;
case MessageType.ReadData:
// 读数据请求的响应
if (receivedMessage.Data.Length >= 6)
{
byte dataType = receivedMessage.Data[5];
byte[] meterData = _deviceDataService.GetMeterData(dataType);
if (meterData.Length > 0)
{
// 构造响应数据
byte[] readDataResponseData = new byte[5 + meterData.Length];
readDataResponseData[0] = 0x0B; // AFN
readDataResponseData[1] = 0x02; // 数据单元标识1
readDataResponseData[2] = 0x70; // 数据单元标识2
readDataResponseData[3] = 0x00; // 结果码, 成功
readDataResponseData[4] = dataType; // 数据类型
// 拷贝表计数据
Array.Copy(meterData, 0, readDataResponseData, 5, meterData.Length);
responseMessage = new Protocol376Message
{
ControlCode = 0x00,
Address = receivedMessage.Address,
Data = readDataResponseData,
Type = MessageType.Response
};
}
}
break;
}
// 发送响应消息
if (responseMessage != null)
{
await SendMessageAsync(responseMessage);
Log.Debug("集中器 (地址: {Address}) 自动响应: {MessageInfo}",
_concentratorAddress, responseMessage.GetMessageInfo());
}
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 处理自动响应时发生错误: {ErrorMessage}",
_concentratorAddress, ex.Message);
_statisticsService.RecordError($"自动响应错误: {ex.Message}");
}
}
///
/// 阀门状态变更处理
///
private void OnValveStatusChanged(object sender, bool isOpen)
{
StatusChanged?.Invoke(this, $"阀门状态已变更为: {(isOpen ? "开启" : "关闭")}");
}
///
/// 连接状态变更处理
///
private void OnConnectionStatusChanged(object sender, NetworkService.ConnectionStatus status)
{
switch (status)
{
case NetworkService.ConnectionStatus.Connected:
StatusChanged?.Invoke(this, "已连接到服务器");
break;
case NetworkService.ConnectionStatus.Disconnected:
_loginConfirmed = false;
StatusChanged?.Invoke(this, "与服务器断开连接");
break;
case NetworkService.ConnectionStatus.Failed:
_loginConfirmed = false;
StatusChanged?.Invoke(this, "连接失败");
break;
case NetworkService.ConnectionStatus.Reconnecting:
StatusChanged?.Invoke(this, "正在重新连接");
break;
}
}
///
/// 网络错误处理
///
private void OnNetworkError(object sender, Exception ex)
{
_statisticsService.RecordError($"网络错误: {ex.Message}");
StatusChanged?.Invoke(this, $"网络错误: {ex.Message}");
}
///
/// 重连完成处理
///
private async void OnReconnectAttemptCompleted(object sender, bool success)
{
if (success)
{
StatusChanged?.Invoke(this, "重连成功");
// 重新登录
try
{
await Task.Delay(1000); // 等待1秒确保连接稳定
await SendLoginMessageAsync();
Log.Information("集中器 (地址: {Address}) 重连后自动登录", _concentratorAddress);
}
catch (Exception ex)
{
Log.Error(ex, "集中器 (地址: {Address}) 重连后登录失败: {ErrorMessage}",
_concentratorAddress, ex.Message);
}
}
else
{
StatusChanged?.Invoke(this, $"重连失败 (尝试次数: {ReconnectAttempts})");
}
}
}
}