Skip to content

Commit

Permalink
refactor: roll out Network::connect
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed May 29, 2024
1 parent a1282cd commit e63bcab
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 43 deletions.
15 changes: 6 additions & 9 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl EventLoop {

requests_in_channel.retain(|request| {
match request {
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
_ => true,
}
});
Expand Down Expand Up @@ -486,18 +486,15 @@ async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();

let mut connect = Connect::new(options.client_id());
connect.keep_alive = keep_alive;
connect.clean_session = clean_session;
connect.last_will = last_will;
connect.keep_alive = options.keep_alive().as_secs() as u16;
connect.clean_session = options.clean_session();
connect.last_will = options.last_will();
connect.login = options.credentials();

// send mqtt connect packet
network.connect(connect).await?;
network.write(Packet::Connect(connect)).await?;
network.flush().await?;

// validate connack
match network.read().await? {
Expand Down
6 changes: 0 additions & 6 deletions rumqttc/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ impl Network {
.map_err(StateError::Deserialization)
}

pub async fn connect(&mut self, connect: Connect) -> Result<(), StateError> {
self.write(Packet::Connect(connect)).await?;

self.flush().await
}

pub async fn flush(&mut self) -> Result<(), crate::state::StateError> {
self.framed
.flush()
Expand Down
26 changes: 13 additions & 13 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl EventLoop {

requests_in_channel.retain(|request| {
match request {
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
_ => true,
}
});
Expand Down Expand Up @@ -398,20 +398,20 @@ async fn mqtt_connect(
options: &mut MqttOptions,
network: &mut Network,
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_start = options.clean_start();
let client_id = options.client_id();
let properties = options.connect_properties();

let connect = Connect {
keep_alive,
client_id,
clean_start,
properties,
};
let packet = Packet::Connect(
Connect {
client_id: options.client_id(),
keep_alive: options.keep_alive().as_secs() as u16,
clean_start: options.clean_start(),
properties: options.connect_properties(),
},
options.last_will(),
options.credentials(),
);

// send mqtt connect packet
network.connect(connect, options).await?;
network.write(packet).await?;
network.flush().await?;

// validate connack
match network.read().await? {
Expand Down
16 changes: 1 addition & 15 deletions rumqttc/src/v5/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use tokio_util::codec::Framed;
use crate::framed::AsyncReadWrite;

use super::mqttbytes::v5::Packet;
use super::{mqttbytes, Codec, Connect, MqttOptions, MqttState};
use super::{Incoming, StateError};
use super::{mqttbytes, Codec, Incoming, MqttState, StateError};

/// Network transforms packets <-> frames efficiently. It takes
/// advantage of pre-allocation, buffering and vectorization when
Expand Down Expand Up @@ -86,19 +85,6 @@ impl Network {
.map_err(StateError::Deserialization)
}

pub async fn connect(
&mut self,
connect: Connect,
options: &MqttOptions,
) -> Result<(), StateError> {
let last_will = options.last_will();
let login = options.credentials();
self.write(Packet::Connect(connect, last_will, login))
.await?;

self.flush().await
}

pub async fn flush(&mut self) -> Result<(), StateError> {
self.framed
.flush()
Expand Down

0 comments on commit e63bcab

Please sign in to comment.