diff --git a/src/binding/mod.rs b/src/binding/mod.rs index a94fb3c3..13332123 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -52,6 +52,7 @@ pub(crate) mod kafka { } pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json"; +pub(crate) static CLOUDEVENTS_BATCH_JSON_HEADER: &str = "application/cloudevents-batch+json"; pub(crate) static CONTENT_TYPE: &str = "content-type"; fn header_prefix(prefix: &str, name: &str) -> String { diff --git a/src/binding/reqwest/client_request.rs b/src/binding/reqwest/client_request.rs index d6c818ec..d417a68f 100644 --- a/src/binding/reqwest/client_request.rs +++ b/src/binding/reqwest/client_request.rs @@ -2,7 +2,7 @@ use reqwest_lib as reqwest; use crate::binding::{ http::{header_prefix, SPEC_VERSION_HEADER}, - CLOUDEVENTS_JSON_HEADER, + CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER, }; use crate::event::SpecVersion; use crate::message::{ @@ -72,18 +72,35 @@ pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder)) } +/// Method to fill a [`RequestBuilder`] with a batched [`Vec`]. +pub fn events_to_request( + events: Vec, + request_builder: RequestBuilder, +) -> Result { + let bytes = serde_json::to_vec(&events)?; + Ok(request_builder + .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER) + .body(bytes)) +} + /// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`]. /// /// This trait is sealed and cannot be implemented for types outside of this crate. pub trait RequestBuilderExt: private::Sealed { /// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`]. fn event(self, event: Event) -> Result; + /// Write in this [`RequestBuilder`] the provided batched [`Vec`]. + fn events(self, events: Vec) -> Result; } impl RequestBuilderExt for RequestBuilder { fn event(self, event: Event) -> Result { event_to_request(event, self) } + + fn events(self, events: Vec) -> Result { + events_to_request(events, self) + } } // Sealing the RequestBuilderExt @@ -183,4 +200,25 @@ mod tests { m.assert(); } + + #[tokio::test] + async fn test_batched_request() { + let input = vec![fixtures::v10::full_json_data_string_extension()]; + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("content-type", "application/cloudevents-batch+json") + .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap())) + .create(); + + let client = reqwest::Client::new(); + client + .post(&url) + .events(input) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } } diff --git a/src/binding/reqwest/client_response.rs b/src/binding/reqwest/client_response.rs index f65f98f0..4304d967 100644 --- a/src/binding/reqwest/client_response.rs +++ b/src/binding/reqwest/client_response.rs @@ -1,9 +1,10 @@ use reqwest_lib as reqwest; -use crate::binding::http; +use crate::binding; use crate::message::{Error, Result}; use crate::Event; use async_trait::async_trait; +use http::header; use reqwest::Response; /// Method to transform an incoming [`Response`] to [`Event`]. @@ -12,7 +13,26 @@ pub async fn response_to_event(res: Response) -> Result { let b = res.bytes().await.map_err(|e| Error::Other { source: Box::new(e), })?; - http::to_event(&h, b.to_vec()) + binding::http::to_event(&h, b.to_vec()) +} + +/// Method to transform an incoming [`Response`] to a batched [`Vec`] +pub async fn response_to_events(res: Response) -> Result> { + if res + .headers() + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER)) + .is_none() + { + return Err(Error::WrongEncoding {}); + } + + let bytes = res.bytes().await.map_err(|e| Error::Other { + source: Box::new(e), + })?; + + Ok(serde_json::from_slice(&bytes)?) } /// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`]. @@ -22,6 +42,8 @@ pub async fn response_to_event(res: Response) -> Result { pub trait ResponseExt: private::Sealed { /// Convert this [`Response`] to [`Event`]. async fn into_event(self) -> Result; + /// Convert this [`Response`] to a batched [`Vec`]. + async fn into_events(self) -> Result>; } #[async_trait(?Send)] @@ -29,6 +51,10 @@ impl ResponseExt for Response { async fn into_event(self) -> Result { response_to_event(self).await } + + async fn into_events(self) -> Result> { + response_to_events(self).await + } } // Sealing the ResponseExt @@ -44,6 +70,7 @@ mod tests { use super::*; use mockito::mock; use reqwest_lib as reqwest; + use std::vec; use crate::test::fixtures; @@ -133,4 +160,31 @@ mod tests { assert_eq!(expected, res); } + + #[tokio::test] + async fn test_batched_response() { + let expected = vec![fixtures::v10::full_json_data_string_extension()]; + + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header( + "content-type", + "application/cloudevents-batch+json; charset=utf-8", + ) + .with_body(serde_json::to_string(&expected).unwrap()) + .create(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_events() + .await + .unwrap(); + + assert_eq!(expected, res); + } }