Skip to content

Commit

Permalink
Add separate old http feature and prep for PR.
Browse files Browse the repository at this point in the history
Signed-off-by: Omar Zabala-Ferrera <73452461+ozabalaferrera@users.noreply.github.com>
  • Loading branch information
ozabalaferrera committed Jul 28, 2024
1 parent 11fb179 commit 1595968
Show file tree
Hide file tree
Showing 24 changed files with 160 additions and 421 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cloudevents-sdk"
version = "0.7.0"
version = "0.8.0"
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
license-file = "LICENSE"
edition = "2018"
Expand All @@ -23,6 +23,7 @@ name = "cloudevents"

[features]
http-binding = ["async-trait", "bytes", "futures", "http"]
http-0-2-binding = ["async-trait", "bytes", "futures", "http-0-2"]
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http-0-2"]
reqwest = ["reqwest-lib", "async-trait", "bytes", "http", "uuid/js"]
rdkafka = ["rdkafka-lib", "bytes", "futures"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ enabling your Protocol Binding of choice:

```toml
[dependencies]
cloudevents-sdk = { version = "0.7.0" }
cloudevents-sdk = { version = "0.8.0" }
```

Now you can start creating events:
Expand Down
12 changes: 4 additions & 8 deletions src/binding/actix/server_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use http_0_2 as http;

/// Implement Headers for the actix HeaderMap
impl<'a> Headers<'a> for actix_http::header::HeaderMap {
type Iterator =
Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
self.get(key.as_str())
}
Expand All @@ -29,15 +28,13 @@ pub async fn request_to_event(
while let Some(item) = payload.next().await {
bytes.extend_from_slice(&item?);
}
to_event(req.headers(), bytes.to_vec())
.map_err(actix_web::error::ErrorBadRequest)
to_event(req.headers(), bytes.to_vec()).map_err(actix_web::error::ErrorBadRequest)
}

/// So that an actix-web handler may take an Event parameter
impl actix_web::FromRequest for Event {
type Error = actix_web::Error;
type Future =
LocalBoxFuture<'static, std::result::Result<Self, Self::Error>>;
type Future = LocalBoxFuture<'static, std::result::Result<Self, Self::Error>>;

fn from_request(r: &HttpRequest, p: &mut Payload) -> Self::Future {
let request = r.to_owned();
Expand Down Expand Up @@ -145,8 +142,7 @@ mod tests {
"datacontenttype": "application/json",
"data": fixtures::json_data()
});
let bytes = serde_json::to_string(&payload)
.expect("Failed to serialize test data to json");
let bytes = serde_json::to_string(&payload).expect("Failed to serialize test data to json");

let expected = fixtures::v10::full_json_data_string_extension();

Expand Down
10 changes: 2 additions & 8 deletions src/binding/actix/server_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,11 @@ impl actix_web::Responder for Event {
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait HttpResponseBuilderExt: private::Sealed {
/// Fill this [`HttpResponseBuilder`] with an [`Event`].
fn event(
self,
event: Event,
) -> std::result::Result<HttpResponse, actix_web::Error>;
fn event(self, event: Event) -> std::result::Result<HttpResponse, actix_web::Error>;
}

impl HttpResponseBuilderExt for HttpResponseBuilder {
fn event(
self,
event: Event,
) -> std::result::Result<HttpResponse, actix_web::Error> {
fn event(self, event: Event) -> std::result::Result<HttpResponse, actix_web::Error> {
event_to_response(event, self)
}
}
Expand Down
18 changes: 7 additions & 11 deletions src/binding/axum/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@ where
{
type Rejection = Response;

async fn from_request(
req: Request,
_state: &S,
) -> Result<Self, Self::Rejection> {
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let (parts, body) = req.into_parts();

let body =
axum::body::to_bytes(body, usize::MAX).await.map_err(|e| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(e.to_string()))
.unwrap()
})?;
let body = axum::body::to_bytes(body, usize::MAX).await.map_err(|e| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(e.to_string()))
.unwrap()
})?;

to_event(&parts.headers, body.to_vec()).map_err(|e| {
Response::builder()
Expand Down
4 changes: 1 addition & 3 deletions src/binding/http/builder/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ impl Builder<Response<BoxBody>> for Adapter {
}
}

pub fn to_response(
event: Event,
) -> std::result::Result<Response<BoxBody>, Error> {
pub fn to_response(event: Event) -> std::result::Result<Response<BoxBody>, Error> {
BinaryDeserializer::deserialize_binary(
event,
Serializer::new(Adapter {

Check failure on line 38 in src/binding/http/builder/adapter.rs

View workflow job for this annotation

GitHub Actions / stable / wasm32-wasi

the trait bound `Adapter: http::builder::Builder<_>` is not satisfied

Check failure on line 38 in src/binding/http/builder/adapter.rs

View workflow job for this annotation

GitHub Actions / nightly / wasm32-wasi

the trait bound `Adapter: http::builder::Builder<_>` is not satisfied
Expand Down
27 changes: 7 additions & 20 deletions src/binding/http/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use crate::{
event::SpecVersion,
header_value_to_str, message,
message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer,
StructuredSerializer,
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
},
};

Expand All @@ -25,10 +24,7 @@ impl<'a, T: Headers<'a>> Deserializer<'a, T> {
}

impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
self,
mut visitor: V,
) -> Result<R> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
Expand All @@ -53,26 +49,20 @@ impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}

if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}

Expand All @@ -85,10 +75,7 @@ impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
}

impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
self,
visitor: V,
) -> Result<R> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
Expand Down
54 changes: 14 additions & 40 deletions src/binding/http/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::binding::{
use crate::event::SpecVersion;
use crate::message::BinaryDeserializer;
use crate::message::{
BinarySerializer, Error, MessageAttributeValue, Result,
StructuredSerializer,
BinarySerializer, Error, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use http::Request;
Expand All @@ -20,11 +19,11 @@ use std::fmt::Debug;

macro_rules! str_to_header_value {
($header_value:expr) => {
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(
|e| crate::message::Error::Other {
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
},
)
}
})
};
}

Expand All @@ -47,22 +46,14 @@ impl<T> BinarySerializer<T> for Serializer<T> {
Ok(self)
}

fn set_attribute(
self,
name: &str,
value: MessageAttributeValue,
) -> Result<Self> {
fn set_attribute(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder
.borrow_mut()
.header(&header_prefix(name), str_to_header_value!(value)?);
Ok(self)
}

fn set_extension(
self,
name: &str,
value: MessageAttributeValue,
) -> Result<Self> {
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder
.borrow_mut()
.header(&header_prefix(name), str_to_header_value!(value)?);
Expand All @@ -89,8 +80,7 @@ impl<T> StructuredSerializer<T> for Serializer<T> {
}
}

impl<T> BinarySerializer<http::request::Request<Option<T>>>
for http::request::Builder
impl<T> BinarySerializer<http::request::Request<Option<T>>> for http::request::Builder
where
T: TryFrom<Vec<u8>>,
<T as TryFrom<Vec<u8>>>::Error: Debug,
Expand All @@ -100,30 +90,19 @@ where
Ok(self)
}

fn set_attribute(
mut self,
name: &str,
value: MessageAttributeValue,
) -> Result<Self> {
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self = self.header(key, &value.to_string());
Ok(self)
}

fn set_extension(
mut self,
name: &str,
value: MessageAttributeValue,
) -> Result<Self> {
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self = self.header(key, &value.to_string());
Ok(self)
}

fn end_with_data(
self,
bytes: Vec<u8>,
) -> Result<http::request::Request<Option<T>>> {
fn end_with_data(self, bytes: Vec<u8>) -> Result<http::request::Request<Option<T>>> {
let body = T::try_from(bytes).unwrap();
self.body(Some(body)).map_err(|e| Error::Other {
source: Box::new(e),
Expand All @@ -145,10 +124,7 @@ where
type Error = crate::message::Error;

fn try_from(event: Event) -> Result<Self> {
BinaryDeserializer::deserialize_binary(
event,
http::request::Builder::new(),
)
BinaryDeserializer::deserialize_binary(event, http::request::Builder::new())
}
}

Expand All @@ -164,8 +140,7 @@ mod tests {
#[test]
fn test_event_to_http_request() {
let event = fixtures::v10::minimal_string_extension();
let request: Request<Option<Vec<u8>>> =
Request::try_from(event).unwrap();
let request: Request<Option<Vec<u8>>> = Request::try_from(event).unwrap();

assert_eq!(request.headers()["ce-id"], "0001");
assert_eq!(request.headers()["ce-type"], "test_event.test_application");
Expand All @@ -174,8 +149,7 @@ mod tests {
#[test]
fn test_event_to_bytes_body() {
let event = fixtures::v10::full_binary_json_data_string_extension();
let request: Request<Option<Vec<u8>>> =
Request::try_from(event).unwrap();
let request: Request<Option<Vec<u8>>> = Request::try_from(event).unwrap();

assert_eq!(request.headers()["ce-id"], "0001");
assert_eq!(request.headers()["ce-type"], "test_event.test_application");
Expand Down
9 changes: 5 additions & 4 deletions src/binding/http_0_2/builder/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ impl Builder<Response<Body>> for Adapter {
self.builder.set(self.builder.take().header(key, value));
}
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
self.builder.take().body(Body::from(bytes)).map_err(|e| {
crate::message::Error::Other {
self.builder
.take()
.body(Body::from(bytes))
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
}
})
})
}
fn finish(&mut self) -> Result<Response<Body>> {
self.body(Vec::new())
Expand Down
27 changes: 7 additions & 20 deletions src/binding/http_0_2/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use crate::{
event::SpecVersion,
header_value_to_str, message,
message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer,
StructuredSerializer,
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
},
};
use http_0_2 as http;
Expand All @@ -24,10 +23,7 @@ impl<'a, T: Headers<'a>> Deserializer<'a, T> {
}

impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
self,
mut visitor: V,
) -> Result<R> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
Expand All @@ -52,26 +48,20 @@ impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}

if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}

Expand All @@ -84,10 +74,7 @@ impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
}

impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
self,
visitor: V,
) -> Result<R> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
Expand Down
Loading

0 comments on commit 1595968

Please sign in to comment.