From e63bcab6cdfe22e4e8cca85011f34cc3bff21948 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 29 May 2024 22:45:09 +0530 Subject: [PATCH] refactor: roll out `Network::connect` --- rumqttc/src/eventloop.rs | 15 ++++++--------- rumqttc/src/framed.rs | 6 ------ rumqttc/src/v5/eventloop.rs | 26 +++++++++++++------------- rumqttc/src/v5/framed.rs | 16 +--------------- 4 files changed, 20 insertions(+), 43 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index f6730fe3..d31690d9 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -134,7 +134,7 @@ impl EventLoop { requests_in_channel.retain(|request| { match request { - Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack + Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack _ => true, } }); @@ -486,18 +486,15 @@ async fn mqtt_connect( options: &MqttOptions, network: &mut Network, ) -> Result { - let keep_alive = options.keep_alive().as_secs() as u16; - let clean_session = options.clean_session(); - let last_will = options.last_will(); - let mut connect = Connect::new(options.client_id()); - connect.keep_alive = keep_alive; - connect.clean_session = clean_session; - connect.last_will = last_will; + connect.keep_alive = options.keep_alive().as_secs() as u16; + connect.clean_session = options.clean_session(); + connect.last_will = options.last_will(); connect.login = options.credentials(); // send mqtt connect packet - network.connect(connect).await?; + network.write(Packet::Connect(connect)).await?; + network.flush().await?; // validate connack match network.read().await? { diff --git a/rumqttc/src/framed.rs b/rumqttc/src/framed.rs index a1c45d7a..beb1d32d 100644 --- a/rumqttc/src/framed.rs +++ b/rumqttc/src/framed.rs @@ -84,12 +84,6 @@ impl Network { .map_err(StateError::Deserialization) } - pub async fn connect(&mut self, connect: Connect) -> Result<(), StateError> { - self.write(Packet::Connect(connect)).await?; - - self.flush().await - } - pub async fn flush(&mut self) -> Result<(), crate::state::StateError> { self.framed .flush() diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index ea7076dd..cd0568ad 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -130,7 +130,7 @@ impl EventLoop { requests_in_channel.retain(|request| { match request { - Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack + Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack _ => true, } }); @@ -398,20 +398,20 @@ async fn mqtt_connect( options: &mut MqttOptions, network: &mut Network, ) -> Result { - let keep_alive = options.keep_alive().as_secs() as u16; - let clean_start = options.clean_start(); - let client_id = options.client_id(); - let properties = options.connect_properties(); - - let connect = Connect { - keep_alive, - client_id, - clean_start, - properties, - }; + let packet = Packet::Connect( + Connect { + client_id: options.client_id(), + keep_alive: options.keep_alive().as_secs() as u16, + clean_start: options.clean_start(), + properties: options.connect_properties(), + }, + options.last_will(), + options.credentials(), + ); // send mqtt connect packet - network.connect(connect, options).await?; + network.write(packet).await?; + network.flush().await?; // validate connack match network.read().await? { diff --git a/rumqttc/src/v5/framed.rs b/rumqttc/src/v5/framed.rs index c7e06a25..4a5c3049 100644 --- a/rumqttc/src/v5/framed.rs +++ b/rumqttc/src/v5/framed.rs @@ -5,8 +5,7 @@ use tokio_util::codec::Framed; use crate::framed::AsyncReadWrite; use super::mqttbytes::v5::Packet; -use super::{mqttbytes, Codec, Connect, MqttOptions, MqttState}; -use super::{Incoming, StateError}; +use super::{mqttbytes, Codec, Incoming, MqttState, StateError}; /// Network transforms packets <-> frames efficiently. It takes /// advantage of pre-allocation, buffering and vectorization when @@ -86,19 +85,6 @@ impl Network { .map_err(StateError::Deserialization) } - pub async fn connect( - &mut self, - connect: Connect, - options: &MqttOptions, - ) -> Result<(), StateError> { - let last_will = options.last_will(); - let login = options.credentials(); - self.write(Packet::Connect(connect, last_will, login)) - .await?; - - self.flush().await - } - pub async fn flush(&mut self) -> Result<(), StateError> { self.framed .flush()