Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rumqtt): validate topic and topic filter #813

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
- Make v4::Connect::write return correct value
- Validate topic filter and topic name for MQTT v3 and v5 during `Publish`, `Subscribe`, and `Unsubscribe` operations.

### Security

Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ url = { version = "2", default-features = false, optional = true }
# proxy
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }
tokio-stream = "0.1.15"
memchr = "2.7.2"

[dev-dependencies]
bincode = "1.3.3"
Expand Down
58 changes: 35 additions & 23 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,13 @@ impl AsyncClient {
where
S: Into<String>,
{
let mut publish = Publish::from_bytes(topic, qos, payload);
let topic = topic.into();
let mut publish = Publish::from_bytes(&topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::Request(publish));
}
self.request_tx.send_async(publish).await?;
Ok(())
}
Expand All @@ -153,7 +157,7 @@ impl AsyncClient {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
if !valid_filter(topic) {
return Err(ClientError::Request(request));
}
self.request_tx.send_async(request).await?;
Expand All @@ -165,8 +169,8 @@ impl AsyncClient {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::TryRequest(request));
if !valid_filter(topic) {
return Err(ClientError::Request(request));
}
self.request_tx.try_send(request)?;
Ok(())
Expand All @@ -177,11 +181,10 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let subscribe = Subscribe::new_many(topics);
let is_invalid_filter = subscribe.filters.iter().any(|t| !valid_filter(&t.path));
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
if is_invalid_filter {
return Err(ClientError::Request(request));
}
self.request_tx.send_async(request).await?;
Expand All @@ -193,29 +196,36 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let subscribe = Subscribe::new_many(topics);
let is_invalid_filter = subscribe.filters.iter().any(|t| !valid_filter(&t.path));
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
if is_invalid_filter {
return Err(ClientError::Request(request));
}
self.request_tx.try_send(request)?;
Ok(())
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let topic = topic.into();
let unsubscribe = Unsubscribe::new(&topic);
let request = Request::Unsubscribe(unsubscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
}
self.request_tx.send_async(request).await?;
Ok(())
}

/// Attempts to send a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let topic = topic.into();
let unsubscribe = Unsubscribe::new(&topic);
let request = Request::Unsubscribe(unsubscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
}
self.request_tx.try_send(request)?;
Ok(())
}
Expand Down Expand Up @@ -319,8 +329,7 @@ impl Client {
S: Into<String>,
V: Into<Vec<u8>>,
{
self.client.try_publish(topic, qos, retain, payload)?;
Ok(())
self.client.try_publish(topic, qos, retain, payload)
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
Expand All @@ -344,7 +353,7 @@ impl Client {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
if !valid_filter(topic) {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Expand All @@ -362,11 +371,10 @@ impl Client {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let subscribe = Subscribe::new_many(topics);
let is_invalid_filter = subscribe.filters.iter().any(|t| !valid_filter(&t.path));
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
if is_invalid_filter {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Expand All @@ -382,8 +390,12 @@ impl Client {

/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let topic = topic.into();
let unsubscribe = Unsubscribe::new(&topic);
let request = Request::Unsubscribe(unsubscribe);
if !valid_filter(topic) {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Ok(())
}
Expand Down
57 changes: 43 additions & 14 deletions rumqttc/src/mqttbytes/topic.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,35 @@
use memchr::{memchr, memchr2};

/// Maximum length of a topic or topic filter according to
/// [MQTT-4.7.3-3](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718109)
pub const MAX_TOPIC_LEN: usize = 65535;

/// Checks if a topic or topic filter has wildcards
pub fn has_wildcards(s: &str) -> bool {
s.contains('+') || s.contains('#')
pub fn has_wildcards(s: impl AsRef<str>) -> bool {
memchr2(b'+', b'#', s.as_ref().as_bytes()).is_some()
}

/// Check if a topic is valid for PUBLISH packet.
pub fn valid_topic(topic: impl AsRef<str>) -> bool {
can_be_topic_or_filter(&topic) && !has_wildcards(topic)
}

/// Checks if a topic is valid
pub fn valid_topic(topic: &str) -> bool {
// topic can't contain wildcards
if topic.contains('+') || topic.contains('#') {
/// Check if a topic is valid to qualify as a topic name or topic filter.
///
/// According to MQTT v3 Spec, it has to follow the following rules:
/// 1. All Topic Names and Topic Filters MUST be at least one character long [MQTT-4.7.3-1]
/// 2. Topic Names and Topic Filters are case sensitive
/// 3. Topic Names and Topic Filters can include the space character
/// 4. A leading or trailing `/` creates a distinct Topic Name or Topic Filter
/// 5. A Topic Name or Topic Filter consisting only of the `/` character is valid
/// 6. Topic Names and Topic Filters MUST NOT include the null character (Unicode U+0000) [MQTT-4.7.3-2]
/// 7. Topic Names and Topic Filters are UTF-8 encoded strings, they MUST NOT encode to more than 65535 bytes.
fn can_be_topic_or_filter(topic_or_filter: impl AsRef<str>) -> bool {
let topic_or_filter = topic_or_filter.as_ref();
if topic_or_filter.is_empty()
|| topic_or_filter.len() > MAX_TOPIC_LEN
|| memchr(b'\0', topic_or_filter.as_bytes()).is_some()
{
return false;
}

Expand All @@ -16,8 +39,9 @@ pub fn valid_topic(topic: &str) -> bool {
/// Checks if the filter is valid
///
/// <https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106>
pub fn valid_filter(filter: &str) -> bool {
if filter.is_empty() {
pub fn valid_filter(filter: impl AsRef<str>) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd name this is_filter_valid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is a public function, we may run into the risk of backwards incompatibility due to name change. What do you think?

let filter = filter.as_ref();
if !can_be_topic_or_filter(filter) {
return false;
}

Expand All @@ -27,12 +51,14 @@ pub fn valid_filter(filter: &str) -> bool {
// split will never return an empty iterator
// even if the pattern isn't matched, the original string will be there
// so it is safe to just unwrap here!
let last = hirerarchy.next().unwrap();
let Some(last) = hirerarchy.next() else {
return false;
};

// only single '#" or '+' is allowed in last entry
// invalid: sport/tennis#
// invalid: sport/++
if last.len() != 1 && (last.contains('#') || last.contains('+')) {
if last.len() != 1 && has_wildcards(last) {
return false;
}

Expand All @@ -41,13 +67,13 @@ pub fn valid_filter(filter: &str) -> bool {
// # is not allowed in filter except as a last entry
// invalid: sport/tennis#/player
// invalid: sport/tennis/#/ranking
if entry.contains('#') {
if memchr(b'#', entry.as_bytes()).is_some() {
return false;
}

// + must occupy an entire level of the filter
// invalid: sport+
if entry.len() > 1 && entry.contains('+') {
if entry.len() > 1 && memchr(b'+', entry.as_bytes()).is_some() {
return false;
}
}
Expand All @@ -60,8 +86,11 @@ pub fn valid_filter(filter: &str) -> bool {
/// **NOTE**: 'topic' is a misnomer in the arg. this can also be used to match 2 wild subscriptions
/// **NOTE**: make sure a topic is validated during a publish and filter is validated
/// during a subscribe
pub fn matches(topic: &str, filter: &str) -> bool {
if !topic.is_empty() && topic[..1].contains('$') {
pub fn matches(topic: impl AsRef<str>, filter: impl AsRef<str>) -> bool {
let topic = topic.as_ref();
let filter = filter.as_ref();

if !topic.is_empty() && memchr(b'$', topic[..1].as_bytes()).is_some() {
return false;
}

Expand Down
12 changes: 5 additions & 7 deletions rumqttc/src/mqttbytes/v4/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ impl Subscribe {
}
}

pub fn new_many<T>(topics: T) -> Subscribe
where
T: IntoIterator<Item = SubscribeFilter>,
{
let filters: Vec<SubscribeFilter> = topics.into_iter().collect();

Subscribe { pkid: 0, filters }
pub fn new_many(topics: impl IntoIterator<Item = SubscribeFilter>) -> Subscribe {
Subscribe {
pkid: 0,
filters: topics.into_iter().collect(),
}
}

pub fn add(&mut self, path: String, qos: QoS) -> &mut Self {
Expand Down
Loading
Loading