Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual ACK packet support #855

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions rumqttc/examples/async_manual_acks_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rumqttc::v5::mqttbytes::v5::Packet;
use rumqttc::v5::mqttbytes::QoS;
use tokio::{task, time};

use rumqttc::v5::{AsyncClient, Event, EventLoop, MqttOptions};
use rumqttc::v5::{AsyncClient, Event, EventLoop, ManualAckReason, MqttOptions};
use std::error::Error;
use std::time::Duration;

Expand Down Expand Up @@ -65,7 +65,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Its important not to block notifier as this can cause deadlock.
let c = client.clone();
tokio::spawn(async move {
c.ack(&publish).await.unwrap();
let mut ack = c.get_manual_ack(&publish);
ack.set_reason(ManualAckReason::Success);
swanandx marked this conversation as resolved.
Show resolved Hide resolved
ack.set_reason_string("There is no error".to_string().into());
c.manual_ack(ack).await.unwrap();
});
}
}
Expand Down
67 changes: 67 additions & 0 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,33 @@ impl AsyncClient {
Ok(())
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
match publish.qos {
QoS::AtMostOnce => ManualAck::None,
QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)),
QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)),
}
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.send_async(ack).await?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.try_send(ack)?;
}
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down Expand Up @@ -235,6 +262,22 @@ impl AsyncClient {
}
}


/// ManualAck packet for manual_ack
pub enum ManualAck {
None,
PubAck(PubAck),
PubRec(PubRec),
}

fn get_manual_ack_req(ack: ManualAck) -> Option<Request> {
match ack {
ManualAck::None => None,
ManualAck::PubAck(ack) => Some(Request::PubAck(ack)),
ManualAck::PubRec(ack) => Some(Request::PubRec(ack)),
}
}

fn get_ack_req(publish: &Publish) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => return None,
Expand Down Expand Up @@ -323,6 +366,30 @@ impl Client {
Ok(())
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
match publish.qos {
QoS::AtMostOnce => ManualAck::None,
QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)),
QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)),
}
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.client.request_tx.send(ack)?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
self.client.try_manual_ack(ack)?;
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down
132 changes: 132 additions & 0 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,33 @@ impl AsyncClient {
self.handle_try_publish(topic, qos, retain, payload, None)
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
match publish.qos {
QoS::AtMostOnce => ManualAck::None,
QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)),
QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)),
}
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.send_async(ack).await?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.try_send(ack)?;
}
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down Expand Up @@ -450,6 +477,91 @@ impl AsyncClient {
}
}

/// Reasons for ManualAck Preparation
pub enum ManualAckReason {
Success,
NoMatchingSubscribers,
UnspecifiedError,
ImplementationSpecificError,
NotAuthorized,
TopicNameInvalid,
PacketIdentifierInUse,
QuotaExceeded,
PayloadFormatInvalid,
}
impl ManualAckReason {
pub fn code(&self) -> u8 {
match self {
ManualAckReason::Success => 0,
ManualAckReason::NoMatchingSubscribers => 16,
ManualAckReason::UnspecifiedError => 128,
ManualAckReason::ImplementationSpecificError => 131,
ManualAckReason::NotAuthorized => 135,
ManualAckReason::TopicNameInvalid => 144,
ManualAckReason::PacketIdentifierInUse => 145,
ManualAckReason::QuotaExceeded => 151,
ManualAckReason::PayloadFormatInvalid => 153,
}
}
}

/// ManualAck packet for manual_ack
pub enum ManualAck {
None,
PubAck(PubAck),
PubRec(PubRec),
}

impl ManualAck {
/// Set reason code for manual_ack sending
pub fn set_reason(&mut self, reason: ManualAckReason) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_code(reason.code()),
ManualAck::PubRec(ack) => ack.set_code(reason.code()),
}
self
}

/// Set reason code number for manual_ack sending
pub fn set_code(&mut self, code: u8) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_code(code),
ManualAck::PubRec(ack) => ack.set_code(code),
}
self
}

/// Set reason string on manual_ack properties
pub fn set_reason_string(&mut self, reason_string: Option<String>) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_reason_string(reason_string),
ManualAck::PubRec(ack) => ack.set_reason_string(reason_string),
}
self
}

/// Set user properties on manual_ack properties
pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_user_properties(user_properties),
ManualAck::PubRec(ack) => ack.set_user_properties(user_properties),
}
self
}
}

fn get_manual_ack_req(ack: ManualAck) -> Option<Request> {
match ack {
ManualAck::None => None,
ManualAck::PubAck(ack) => Some(Request::PubAck(ack)),
ManualAck::PubRec(ack) => Some(Request::PubRec(ack)),
}
}

fn get_ack_req(publish: &Publish) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => return None,
Expand Down Expand Up @@ -584,6 +696,26 @@ impl Client {
self.client.try_publish(topic, qos, retain, payload)
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
self.client.get_manual_ack(publish)
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.client.request_tx.send(ack)?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
self.client.try_manual_ack(ack)?;
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{NetworkOptions, Transport};

use mqttbytes::v5::*;

pub use client::{AsyncClient, Client, ClientError, Connection, Iter};
pub use client::{AsyncClient, Client, ClientError, Connection, Iter, ManualAckReason};
pub use eventloop::{ConnectionError, Event, EventLoop};
pub use state::{MqttState, StateError};

Expand Down
26 changes: 26 additions & 0 deletions rumqttc/src/v5/mqttbytes/v5/puback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,32 @@ impl PubAck {
}
}

pub fn set_code(&mut self, code: u8) {
self.reason = reason(code).unwrap();
}

pub fn set_reason_string(&mut self, reason_string: Option<String>) {
if let Some(props) = &mut self.properties {
props.reason_string = reason_string;
} else {
self.properties = Some(PubAckProperties {
reason_string,
user_properties: Vec::<(String, String)>::new(),
});
}
}

pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) {
if let Some(props) = &mut self.properties {
props.user_properties = user_properties;
} else {
self.properties = Some(PubAckProperties {
reason_string: None,
user_properties,
});
}
}

pub fn size(&self) -> usize {
if self.reason == PubAckReason::Success && self.properties.is_none() {
return 4;
Expand Down
30 changes: 28 additions & 2 deletions rumqttc/src/v5/mqttbytes/v5/pubrec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,32 @@ impl PubRec {
}
}

pub fn set_code(&mut self, code: u8) {
self.reason = reason(code).unwrap();
}

pub fn set_reason_string(&mut self, reason_string: Option<String>) {
if let Some(props) = &mut self.properties {
props.reason_string = reason_string;
} else {
self.properties = Some(PubRecProperties {
reason_string,
user_properties: Vec::<(String, String)>::new(),
});
}
}

pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) {
if let Some(props) = &mut self.properties {
props.user_properties = user_properties;
} else {
self.properties = Some(PubRecProperties {
reason_string: None,
user_properties,
});
}
}

pub fn size(&self) -> usize {
let len = self.len();
let remaining_len_size = len_len(len);
Expand Down Expand Up @@ -83,12 +109,12 @@ impl PubRec {
}

let properties = PubRecProperties::read(&mut bytes)?;
let puback = PubRec {
let pubrec = PubRec {
pkid,
reason: reason(ack_reason)?,
properties,
};
Ok(puback)
Ok(pubrec)
}

pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
Expand Down