Skip to content

Commit

Permalink
fix: don't panic on connection closure (#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Mar 26, 2024
1 parent a7a0c54 commit af55848
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 4 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection

### Changed

Expand Down
6 changes: 4 additions & 2 deletions rumqttc/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ impl Network {
pub async fn read(&mut self) -> Result<Incoming, StateError> {
match self.framed.next().await {
Some(Ok(packet)) => Ok(packet),
Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(),
Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(),
Some(Err(e)) => Err(StateError::Deserialization(e)),
None => Err(StateError::ConnectionAborted),
}
}

Expand All @@ -61,8 +62,9 @@ impl Network {
break;
}
}
Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(),
Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(),
Some(Err(e)) => return Err(StateError::Deserialization(e)),
None => return Err(StateError::ConnectionAborted),
}
// do not wait for subsequent reads
match self.framed.next().now_or_never() {
Expand Down
2 changes: 2 additions & 0 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum StateError {
EmptySubscription,
#[error("Mqtt serialization/deserialization error: {0}")]
Deserialization(#[from] mqttbytes::Error),
#[error("Connection closed by peer abruptly")]
ConnectionAborted,
}

/// State of the mqtt connection.
Expand Down
6 changes: 4 additions & 2 deletions rumqttc/src/v5/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ impl Network {
pub async fn read(&mut self) -> Result<Incoming, StateError> {
match self.framed.next().await {
Some(Ok(packet)) => Ok(packet),
Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(),
Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(),
Some(Err(e)) => Err(StateError::Deserialization(e)),
None => Err(StateError::ConnectionAborted),
}
}

Expand All @@ -63,8 +64,9 @@ impl Network {
break;
}
}
Some(Err(mqttbytes::Error::InsufficientBytes(_))) | None => unreachable!(),
Some(Err(mqttbytes::Error::InsufficientBytes(_))) => unreachable!(),
Some(Err(e)) => return Err(StateError::Deserialization(e)),
None => return Err(StateError::ConnectionAborted),
}
// do not wait for subsequent reads
match self.framed.next().now_or_never() {
Expand Down
2 changes: 2 additions & 0 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub enum StateError {
PubCompFail { reason: PubCompReason },
#[error("Connection failed with reason '{reason:?}' ")]
ConnFail { reason: ConnectReturnCode },
#[error("Connection closed by peer abruptly")]
ConnectionAborted
}

impl From<mqttbytes::Error> for StateError {
Expand Down

0 comments on commit af55848

Please sign in to comment.