Skip to content

Commit

Permalink
Batch Event implementation for reqwest bindings
Browse files Browse the repository at this point in the history
Added `events(Vec<Event>)` to `RequestBuilderExt` to provide a batched
set of Events to send to an HTTP endpoint, and
`into_events() -> Result<Vec<Event>>` to ResponseExt to parse a batched
Event response.

I deliberately kept things simple, as I thought this would be a good
place to start with Batch support throughout the SDK, and the
implementation was simple enough, that there didn't seem to be much
opportunity for reusable libraries across the SDK.
That could be changed as more Batch support is provided across the SDK,
and opportunities for code reuse present themselves.

Signed-off-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
markmandel authored and jcrossley3 committed Jan 3, 2023
1 parent 20fd82a commit 38469b2
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/binding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub(crate) mod kafka {
}

pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
pub(crate) static CLOUDEVENTS_BATCH_JSON_HEADER: &str = "application/cloudevents-batch+json";
pub(crate) static CONTENT_TYPE: &str = "content-type";

fn header_prefix(prefix: &str, name: &str) -> String {
Expand Down
40 changes: 39 additions & 1 deletion src/binding/reqwest/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use reqwest_lib as reqwest;

use crate::binding::{
http::{header_prefix, SPEC_VERSION_HEADER},
CLOUDEVENTS_JSON_HEADER,
CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER,
};
use crate::event::SpecVersion;
use crate::message::{
Expand Down Expand Up @@ -72,18 +72,35 @@ pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
}

/// Method to fill a [`RequestBuilder`] with a batched [`Vec<Event>`].
pub fn events_to_request(
events: Vec<Event>,
request_builder: RequestBuilder,
) -> Result<RequestBuilder> {
let bytes = serde_json::to_vec(&events)?;
Ok(request_builder
.header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER)
.body(bytes))
}

/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait RequestBuilderExt: private::Sealed {
/// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
fn event(self, event: Event) -> Result<RequestBuilder>;
/// Write in this [`RequestBuilder`] the provided batched [`Vec<Event>`].
fn events(self, events: Vec<Event>) -> Result<RequestBuilder>;
}

impl RequestBuilderExt for RequestBuilder {
fn event(self, event: Event) -> Result<RequestBuilder> {
event_to_request(event, self)
}

fn events(self, events: Vec<Event>) -> Result<RequestBuilder> {
events_to_request(events, self)
}
}

// Sealing the RequestBuilderExt
Expand Down Expand Up @@ -183,4 +200,25 @@ mod tests {

m.assert();
}

#[tokio::test]
async fn test_batched_request() {
let input = vec![fixtures::v10::full_json_data_string_extension()];
let url = mockito::server_url();
let m = mock("POST", "/")
.match_header("content-type", "application/cloudevents-batch+json")
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
.create();

let client = reqwest::Client::new();
client
.post(&url)
.events(input)
.unwrap()
.send()
.await
.unwrap();

m.assert();
}
}
58 changes: 56 additions & 2 deletions src/binding/reqwest/client_response.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use reqwest_lib as reqwest;

use crate::binding::http;
use crate::binding;
use crate::message::{Error, Result};
use crate::Event;
use async_trait::async_trait;
use http::header;
use reqwest::Response;

/// Method to transform an incoming [`Response`] to [`Event`].
Expand All @@ -12,7 +13,26 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
let b = res.bytes().await.map_err(|e| Error::Other {
source: Box::new(e),
})?;
http::to_event(&h, b.to_vec())
binding::http::to_event(&h, b.to_vec())
}

/// Method to transform an incoming [`Response`] to a batched [`Vec<Event>`]
pub async fn response_to_events(res: Response) -> Result<Vec<Event>> {
if res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER))
.is_none()
{
return Err(Error::WrongEncoding {});
}

let bytes = res.bytes().await.map_err(|e| Error::Other {
source: Box::new(e),
})?;

Ok(serde_json::from_slice(&bytes)?)
}

/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
Expand All @@ -22,13 +42,19 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
pub trait ResponseExt: private::Sealed {
/// Convert this [`Response`] to [`Event`].
async fn into_event(self) -> Result<Event>;
/// Convert this [`Response`] to a batched [`Vec<Event>`].
async fn into_events(self) -> Result<Vec<Event>>;
}

#[async_trait(?Send)]
impl ResponseExt for Response {
async fn into_event(self) -> Result<Event> {
response_to_event(self).await
}

async fn into_events(self) -> Result<Vec<Event>> {
response_to_events(self).await
}
}

// Sealing the ResponseExt
Expand All @@ -44,6 +70,7 @@ mod tests {
use super::*;
use mockito::mock;
use reqwest_lib as reqwest;
use std::vec;

use crate::test::fixtures;

Expand Down Expand Up @@ -133,4 +160,31 @@ mod tests {

assert_eq!(expected, res);
}

#[tokio::test]
async fn test_batched_response() {
let expected = vec![fixtures::v10::full_json_data_string_extension()];

let url = mockito::server_url();
let _m = mock("GET", "/")
.with_status(200)
.with_header(
"content-type",
"application/cloudevents-batch+json; charset=utf-8",
)
.with_body(serde_json::to_string(&expected).unwrap())
.create();

let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_events()
.await
.unwrap();

assert_eq!(expected, res);
}
}

0 comments on commit 38469b2

Please sign in to comment.