From 38469b245dbeafdeade7f65791ef8c6bad4f624e Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Tue, 20 Dec 2022 15:03:34 -0800 Subject: [PATCH] Batch Event implementation for reqwest bindings Added `events(Vec)` to `RequestBuilderExt` to provide a batched set of Events to send to an HTTP endpoint, and `into_events() -> Result>` to ResponseExt to parse a batched Event response. I deliberately kept things simple, as I thought this would be a good place to start with Batch support throughout the SDK, and the implementation was simple enough, that there didn't seem to be much opportunity for reusable libraries across the SDK. That could be changed as more Batch support is provided across the SDK, and opportunities for code reuse present themselves. Signed-off-by: Mark Mandel --- src/binding/mod.rs | 1 + src/binding/reqwest/client_request.rs | 40 +++++++++++++++++- src/binding/reqwest/client_response.rs | 58 +++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 3 deletions(-) diff --git a/src/binding/mod.rs b/src/binding/mod.rs index a94fb3c3..13332123 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -52,6 +52,7 @@ pub(crate) mod kafka { } pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json"; +pub(crate) static CLOUDEVENTS_BATCH_JSON_HEADER: &str = "application/cloudevents-batch+json"; pub(crate) static CONTENT_TYPE: &str = "content-type"; fn header_prefix(prefix: &str, name: &str) -> String { diff --git a/src/binding/reqwest/client_request.rs b/src/binding/reqwest/client_request.rs index d6c818ec..d417a68f 100644 --- a/src/binding/reqwest/client_request.rs +++ b/src/binding/reqwest/client_request.rs @@ -2,7 +2,7 @@ use reqwest_lib as reqwest; use crate::binding::{ http::{header_prefix, SPEC_VERSION_HEADER}, - CLOUDEVENTS_JSON_HEADER, + CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER, }; use crate::event::SpecVersion; use crate::message::{ @@ -72,18 +72,35 @@ pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder)) } +/// Method to fill a [`RequestBuilder`] with a batched [`Vec`]. +pub fn events_to_request( + events: Vec, + request_builder: RequestBuilder, +) -> Result { + let bytes = serde_json::to_vec(&events)?; + Ok(request_builder + .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER) + .body(bytes)) +} + /// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`]. /// /// This trait is sealed and cannot be implemented for types outside of this crate. pub trait RequestBuilderExt: private::Sealed { /// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`]. fn event(self, event: Event) -> Result; + /// Write in this [`RequestBuilder`] the provided batched [`Vec`]. + fn events(self, events: Vec) -> Result; } impl RequestBuilderExt for RequestBuilder { fn event(self, event: Event) -> Result { event_to_request(event, self) } + + fn events(self, events: Vec) -> Result { + events_to_request(events, self) + } } // Sealing the RequestBuilderExt @@ -183,4 +200,25 @@ mod tests { m.assert(); } + + #[tokio::test] + async fn test_batched_request() { + let input = vec![fixtures::v10::full_json_data_string_extension()]; + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("content-type", "application/cloudevents-batch+json") + .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap())) + .create(); + + let client = reqwest::Client::new(); + client + .post(&url) + .events(input) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } } diff --git a/src/binding/reqwest/client_response.rs b/src/binding/reqwest/client_response.rs index f65f98f0..4304d967 100644 --- a/src/binding/reqwest/client_response.rs +++ b/src/binding/reqwest/client_response.rs @@ -1,9 +1,10 @@ use reqwest_lib as reqwest; -use crate::binding::http; +use crate::binding; use crate::message::{Error, Result}; use crate::Event; use async_trait::async_trait; +use http::header; use reqwest::Response; /// Method to transform an incoming [`Response`] to [`Event`]. @@ -12,7 +13,26 @@ pub async fn response_to_event(res: Response) -> Result { let b = res.bytes().await.map_err(|e| Error::Other { source: Box::new(e), })?; - http::to_event(&h, b.to_vec()) + binding::http::to_event(&h, b.to_vec()) +} + +/// Method to transform an incoming [`Response`] to a batched [`Vec`] +pub async fn response_to_events(res: Response) -> Result> { + if res + .headers() + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER)) + .is_none() + { + return Err(Error::WrongEncoding {}); + } + + let bytes = res.bytes().await.map_err(|e| Error::Other { + source: Box::new(e), + })?; + + Ok(serde_json::from_slice(&bytes)?) } /// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`]. @@ -22,6 +42,8 @@ pub async fn response_to_event(res: Response) -> Result { pub trait ResponseExt: private::Sealed { /// Convert this [`Response`] to [`Event`]. async fn into_event(self) -> Result; + /// Convert this [`Response`] to a batched [`Vec`]. + async fn into_events(self) -> Result>; } #[async_trait(?Send)] @@ -29,6 +51,10 @@ impl ResponseExt for Response { async fn into_event(self) -> Result { response_to_event(self).await } + + async fn into_events(self) -> Result> { + response_to_events(self).await + } } // Sealing the ResponseExt @@ -44,6 +70,7 @@ mod tests { use super::*; use mockito::mock; use reqwest_lib as reqwest; + use std::vec; use crate::test::fixtures; @@ -133,4 +160,31 @@ mod tests { assert_eq!(expected, res); } + + #[tokio::test] + async fn test_batched_response() { + let expected = vec![fixtures::v10::full_json_data_string_extension()]; + + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header( + "content-type", + "application/cloudevents-batch+json; charset=utf-8", + ) + .with_body(serde_json::to_string(&expected).unwrap()) + .create(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_events() + .await + .unwrap(); + + assert_eq!(expected, res); + } }