From af55848607e1d379db17734f226023310bf14123 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 26 Mar 2024 13:46:17 +0530 Subject: [PATCH] fix: don't panic on connection closure (#833) --- rumqttc/CHANGELOG.md | 1 + rumqttc/src/framed.rs | 6 ++++-- rumqttc/src/state.rs | 2 ++ rumqttc/src/v5/framed.rs | 6 ++++-- rumqttc/src/v5/state.rs | 2 ++ 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 059e7215..1045cfcf 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `size()` method on `Packet` calculates size once serialized. * `read()` and `write()` methods on `Packet`. +* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection ### Changed diff --git a/rumqttc/src/framed.rs b/rumqttc/src/framed.rs index 4ccfcdad..a1c45d7a 100644 --- a/rumqttc/src/framed.rs +++ b/rumqttc/src/framed.rs @@ -38,8 +38,9 @@ impl Network { pub async fn read(&mut self) -> Result { match self.framed.next().await { Some(Ok(packet)) => Ok(packet), - Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(), + Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(), Some(Err(e)) => Err(StateError::Deserialization(e)), + None => Err(StateError::ConnectionAborted), } } @@ -61,8 +62,9 @@ impl Network { break; } } - Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(), + Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(), Some(Err(e)) => return Err(StateError::Deserialization(e)), + None => return Err(StateError::ConnectionAborted), } // do not wait for subsequent reads match self.framed.next().now_or_never() { diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 1678caff..408741e3 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -29,6 +29,8 @@ pub enum StateError { EmptySubscription, #[error("Mqtt serialization/deserialization error: {0}")] Deserialization(#[from] mqttbytes::Error), + #[error("Connection closed by peer abruptly")] + ConnectionAborted, } /// State of the mqtt connection. diff --git a/rumqttc/src/v5/framed.rs b/rumqttc/src/v5/framed.rs index 961f6b4d..c7e06a25 100644 --- a/rumqttc/src/v5/framed.rs +++ b/rumqttc/src/v5/framed.rs @@ -40,8 +40,9 @@ impl Network { pub async fn read(&mut self) -> Result { match self.framed.next().await { Some(Ok(packet)) => Ok(packet), - Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(), + Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(), Some(Err(e)) => Err(StateError::Deserialization(e)), + None => Err(StateError::ConnectionAborted), } } @@ -63,8 +64,9 @@ impl Network { break; } } - Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(), + Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(), Some(Err(e)) => return Err(StateError::Deserialization(e)), + None => return Err(StateError::ConnectionAborted), } // do not wait for subsequent reads match self.framed.next().now_or_never() { diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 95c9c896..854aa7b0 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -64,6 +64,8 @@ pub enum StateError { PubCompFail { reason: PubCompReason }, #[error("Connection failed with reason '{reason:?}' ")] ConnFail { reason: ConnectReturnCode }, + #[error("Connection closed by peer abruptly")] + ConnectionAborted } impl From for StateError {