Skip to content

Commit

Permalink
docs: document filter
Browse files Browse the repository at this point in the history
  • Loading branch information
mdrssv committed Sep 6, 2024
1 parent 64a8540 commit c909271
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
3 changes: 1 addition & 2 deletions rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::{collections::HashMap, path::Path};

use router::PublishFilterRef;
use serde::{Deserialize, Serialize};
use tracing_subscriber::{
filter::EnvFilter,
Expand All @@ -22,7 +21,7 @@ use tracing_subscriber::{
pub use link::alerts;
pub use link::local;
pub use link::meters;
pub use router::{Alert, IncomingMeter, Meter, Notification, OutgoingMeter};
pub use router::{Alert, IncomingMeter, Meter, Notification, OutgoingMeter, PublishFilter, PublishFilterRef};
use segments::Storage;
pub use server::Broker;

Expand Down
56 changes: 46 additions & 10 deletions rumqttd/src/router/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use std::{fmt::Debug, ops::Deref, sync::Arc};

use crate::protocol::{Publish, PublishProperties};

/// Filter for [`Publish`] packets
pub trait PublishFilter {
/// Determines weather an [`Publish`] packet should be processed
/// Arguments:
/// * `packet`: to be published, may be modified if necessary
/// * `properties`: received along with the packet, may be `None` for older MQTT versions
/// Returns: [`bool`] indicating if the packet should be processed
fn filter(&self, packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool;
}

/// Container for either an owned [`PublishFilter`] or an `'static` reference
#[derive(Clone)]
pub enum PublishFilterRef {
Owned(Arc<dyn PublishFilter + Send + Sync>),
Expand All @@ -15,8 +22,8 @@ pub enum PublishFilterRef {
impl Debug for PublishFilterRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Owned(arg0) => f.debug_tuple("Owned").finish(),
Self::Static(arg0) => f.debug_tuple("Static").finish(),
Self::Owned(_arg0) => f.debug_tuple("Owned").finish(),
Self::Static(_arg0) => f.debug_tuple("Static").finish(),
}
}
}
Expand All @@ -32,6 +39,7 @@ impl Deref for PublishFilterRef {
}
}

/// Implements [`PublishFilter`] for any ordinary function
impl<F> PublishFilter for F
where
F: Fn(&mut Publish, Option<&mut PublishProperties>) -> bool + Send + Sync,
Expand All @@ -41,6 +49,16 @@ where
}
}

/// Implements the conversion
/// ```rust
/// # use rumqttd::{protocol::{Publish, PublishProperties}, PublishFilterRef};
/// fn filter_static(packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
/// todo!()
/// }
///
/// let filter = PublishFilterRef::from(&filter_static);
/// # assert!(matches!(filter, PublishFilterRef::Static(_)));
/// ```
impl<F> From<&'static F> for PublishFilterRef
where
F: Fn(&mut Publish, Option<&mut PublishProperties>) -> bool + Send + Sync,
Expand All @@ -50,34 +68,52 @@ where
}
}

impl<T> From<Box<T>> for PublishFilterRef
/// Implements the conversion
/// ```rust
/// # use std::boxed::Box;
/// # use rumqttd::{protocol::{Publish, PublishProperties}, PublishFilter, PublishFilterRef};
/// #[derive(Clone)]
/// struct MyFilter {}
///
/// impl PublishFilter for MyFilter {
/// fn filter(&self, packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
/// todo!()
/// }
/// }
/// let boxed: Box<MyFilter> = Box::new(MyFilter {});
///
/// let filter = PublishFilterRef::from(boxed);
/// # assert!(matches!(filter, PublishFilterRef::Owned(_)));
/// ```
impl<T> From<Arc<T>> for PublishFilterRef
where
T: PublishFilter + 'static + Send + Sync,
{
fn from(value: Box<T>) -> Self {
Self::Owned(Arc::<T>::from(value))
fn from(value: Arc<T>) -> Self {
Self::Owned(value)
}
}
impl<T> From<Arc<T>> for PublishFilterRef

impl<T> From<Box<T>> for PublishFilterRef
where
T: PublishFilter + 'static + Send + Sync,
{
fn from(value: Arc<T>) -> Self {
Self::Owned(value)
fn from(value: Box<T>) -> Self {
Self::Owned(Arc::<T>::from(value))
}
}

#[cfg(test)]
mod tests {
use super::*;

fn filter_static(packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
fn filter_static(_packet: &mut Publish, _properties: Option<&mut PublishProperties>) -> bool {
true
}
struct Prejudiced(bool);

impl PublishFilter for Prejudiced {
fn filter(&self, packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
fn filter(&self, _packet: &mut Publish,_propertiess: Option<&mut PublishProperties>) -> bool {
self.0
}
}
Expand Down

0 comments on commit c909271

Please sign in to comment.