Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added AUTH packet support for enhanced authentication. #852

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
07b728c
Add enhanced authentication example.
ting-ms Apr 17, 2024
0e5d53a
Test re-authentication.
ting-ms Apr 17, 2024
c42c4e8
Update auth user interface.
ting-ms Apr 19, 2024
9b078bc
Improve logic.
ting-ms Apr 22, 2024
2d76573
Change auth_continue return error type to StateError.
ting-ms Apr 23, 2024
95150f6
Add length_calculation test.
ting-ms Apr 23, 2024
133159d
Improve error handling logic.
ting-ms Apr 24, 2024
7425387
Improve logic.
ting-ms Apr 24, 2024
fee30d6
Change auth to reauth.
ting-ms Apr 24, 2024
9fdf82c
Make auth_manager thread safe.
ting-ms Apr 24, 2024
83b4be2
Improve logic.
ting-ms Apr 24, 2024
4e07ecb
Improve logic.
ting-ms Apr 24, 2024
cb65cb9
Rename trait name "AuthManagerTrait" to "AuthManager".
ting-ms Apr 25, 2024
f624276
Use Mutex instead RefCell.
ting-ms Apr 26, 2024
213702b
Improve logic.
ting-ms Apr 26, 2024
c6101db
Merge branch 'bytebeamio:main' into auth_support
ting-ms Apr 28, 2024
7d37291
Improve logic.
ting-ms Apr 28, 2024
7cfc99b
Clean code.
ting-ms Apr 28, 2024
3e3b78d
Formatted with cargo fmt.
ting-ms Apr 28, 2024
13e08b1
Update CHANGELOG.md
ting-ms Apr 28, 2024
c9241f2
Add reauth and try_reauth to sync client.
ting-ms May 8, 2024
1ab0aac
Add sync_auth example.
ting-ms May 8, 2024
db2ef37
Format code.
ting-ms May 8, 2024
355ace3
Merge remote-tracking branch 'origin/main' into auth_support
ting-ms May 11, 2024
14eab60
Update auth tests.
ting-ms May 28, 2024
d5e5a06
Add a new auth test.
ting-ms May 28, 2024
81728b5
Merge remote-tracking branch 'origin/main' into auth_support
ting-ms May 29, 2024
4fd4e21
Update auth_continue API.
xiaocq2001 May 30, 2024
3cf2071
Merge branch 'auth_support' of https://github.com/ting-ms/rumqtt into…
xiaocq2001 May 30, 2024
c801b3c
Optimize.
xiaocq2001 May 30, 2024
000e712
Update sync example.
xiaocq2001 May 30, 2024
a885651
Merge pull request #2 from xiaocq2001/chxiao/auth_support
xiaocq2001 May 30, 2024
16cc05e
Fix Send issue between threads.
xiaocq2001 Jun 4, 2024
1c22e58
Merge pull request #3 from xiaocq2001/chxiao/auth_support
xiaocq2001 Jun 4, 2024
c7f0dc6
Fix testing.
xiaocq2001 Jun 5, 2024
66a78b4
Merge pull request #4 from xiaocq2001/chxiao/auth_support
xiaocq2001 Jun 5, 2024
8b349fc
Merge branch 'main' into auth_support
de-sh Jun 10, 2024
2c0a896
Fix compile warnings.
xiaocq2001 Jun 14, 2024
0c5783e
Merge pull request #5 from xiaocq2001/chxiao/auth_support
xiaocq2001 Jun 14, 2024
4906aee
Fix testing issue.
xiaocq2001 Jun 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
* `Auth` packet as per MQTT5 standards
* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions`
* `MqttOptions::set_auth_manager` that allows users to set their own authentication manager that implements the `AuthManager` trait.
* `Client::reauth` that enables users to send `AUTH` packet for re-authentication purposes.


### Changed

Expand Down
3 changes: 3 additions & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use-rustls = ["dep:tokio-rustls", "dep:rustls-webpki", "dep:rustls-pemfile", "de
use-native-tls = ["dep:tokio-native-tls", "dep:native-tls"]
websocket = ["dep:async-tungstenite", "dep:ws_stream_tungstenite", "dep:http"]
proxy = ["dep:async-http-proxy"]
auth-scram = ["dep:scram"]

[dependencies]
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
Expand Down Expand Up @@ -50,6 +51,8 @@ url = { version = "2", default-features = false, optional = true }
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }
tokio-stream = "0.1.15"
fixedbitset = "0.5.7"
#auth
scram = { version = "0.6.0", optional = true }

[dev-dependencies]
bincode = "1.3.3"
Expand Down
64 changes: 64 additions & 0 deletions rumqttc/examples/async_auth_oauth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use rumqttc::v5::mqttbytes::v5::AuthProperties;
use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions};
use rumqttc::{TlsConfiguration, Transport};
use std::error::Error;
use std::sync::Arc;
use tokio::task;
use tokio_rustls::rustls::ClientConfig;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let pubsub_access_token = "";

let mut mqttoptions = MqttOptions::new("client1-session1", "MQTT hostname", 8883);
mqttoptions.set_authentication_method(Some("OAUTH2-JWT".to_string()));
mqttoptions.set_authentication_data(Some(pubsub_access_token.into()));

// Use rustls-native-certs to load root certificates from the operating system.
let mut root_cert_store = tokio_rustls::rustls::RootCertStore::empty();
root_cert_store.add_parsable_certificates(
rustls_native_certs::load_native_certs().expect("could not load platform certs"),
);

let client_config = ClientConfig::builder()
.with_root_certificates(root_cert_store)
.with_no_client_auth();

let transport = Transport::Tls(TlsConfiguration::Rustls(Arc::new(client_config.into())));

mqttoptions.set_transport(transport);

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);

task::spawn(async move {
client.subscribe("topic1", QoS::AtLeastOnce).await.unwrap();
client
.publish("topic1", QoS::AtLeastOnce, false, "hello world")
.await
.unwrap();

// Re-authentication test.
let props = AuthProperties {
method: Some("OAUTH2-JWT".to_string()),
data: Some(pubsub_access_token.into()),
reason: None,
user_properties: Vec::new(),
};

client.reauth(Some(props)).await.unwrap();
});

loop {
let notification = eventloop.poll().await;

match notification {
Ok(event) => println!("{:?}", event),
Err(e) => {
println!("Error = {:?}", e);
break;
}
}
}

Ok(())
}
143 changes: 143 additions & 0 deletions rumqttc/examples/async_auth_scram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use bytes::Bytes;
use flume::bounded;
use rumqttc::v5::mqttbytes::{v5::AuthProperties, QoS};
use rumqttc::v5::{AsyncClient, AuthManager, MqttOptions};
#[cfg(feature = "auth-scram")]
use scram::client::ServerFirst;
#[cfg(feature = "auth-scram")]
use scram::ScramClient;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::task;

#[derive(Debug)]
struct ScramAuthManager<'a> {
user: &'a str,
password: &'a str,
#[cfg(feature = "auth-scram")]
scram: Option<ServerFirst<'a>>,
}

impl<'a> ScramAuthManager<'a> {
fn new(user: &'a str, password: &'a str) -> ScramAuthManager<'a> {
ScramAuthManager {
user,
password,
#[cfg(feature = "auth-scram")]
scram: None,
}
}

fn auth_start(&mut self) -> Result<Option<Bytes>, String> {
#[cfg(feature = "auth-scram")]
{
let scram = ScramClient::new(self.user, self.password, None);
let (scram, client_first) = scram.client_first();
self.scram = Some(scram);

Ok(Some(client_first.into()))
}

#[cfg(not(feature = "auth-scram"))]
Ok(Some("client first message".into()))
}
}

impl<'a> AuthManager for ScramAuthManager<'a> {
fn auth_continue(
&mut self,
auth_method: Option<String>,
auth_data: Option<Bytes>,
) -> Result<Option<Bytes>, String> {
#[cfg(feature = "auth-scram")]
{
// Check if the authentication method is SCRAM-SHA-256
if auth_method.unwrap() != "SCRAM-SHA-256" {
return Err("Invalid authentication method".to_string());
}

if self.scram.is_none() {
return Err("Invalid state".to_string());
}

let scram = self.scram.take().unwrap();

let auth_data = String::from_utf8(auth_data.unwrap().to_vec()).unwrap();

// Process the server first message and reassign the SCRAM state.
let scram = match scram.handle_server_first(&auth_data) {
Ok(scram) => scram,
Err(e) => return Err(e.to_string()),
};

// Get the client final message and reassign the SCRAM state.
let (_, client_final) = scram.client_final();

Ok(Some(client_final.into()))
}

#[cfg(not(feature = "auth-scram"))]
Ok(Some("client final message".into()))
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let mut authmanager = ScramAuthManager::new("user1", "123456");
let client_first = authmanager.auth_start().unwrap();
let authmanager = Arc::new(Mutex::new(authmanager));

let mut mqttoptions = MqttOptions::new("auth_test", "127.0.0.1", 1883);
mqttoptions.set_authentication_method(Some("SCRAM-SHA-256".to_string()));
mqttoptions.set_authentication_data(client_first);
mqttoptions.set_auth_manager(authmanager.clone());
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);

let (tx, rx) = bounded(1);

task::spawn(async move {
client
.subscribe("rumqtt_auth/topic", QoS::AtLeastOnce)
.await
.unwrap();
client
.publish("rumqtt_auth/topic", QoS::AtLeastOnce, false, "hello world")
.await
.unwrap();

// Wait for the connection to be established.
rx.recv_async().await.unwrap();

// Reauthenticate using SCRAM-SHA-256
let client_first = authmanager.clone().lock().unwrap().auth_start().unwrap();
let properties = AuthProperties {
method: Some("SCRAM-SHA-256".to_string()),
data: client_first,
reason: None,
user_properties: Vec::new(),
};
client.reauth(Some(properties)).await.unwrap();
});

loop {
let notification = eventloop.poll().await;

match notification {
Ok(event) => {
println!("Event = {:?}", event);
match event {
rumqttc::v5::Event::Incoming(rumqttc::v5::Incoming::ConnAck(_)) => {
tx.send_async("Connected").await.unwrap();
}
_ => {}
}
}
Err(e) => {
println!("Error = {:?}", e);
break;
}
}
}

Ok(())
}
Loading