Skip to content

Commit

Permalink
Just use old http for warp and actix for now.
Browse files Browse the repository at this point in the history
Signed-off-by: Omar Zabala-Ferrera <73452461+ozabalaferrera@users.noreply.github.com>
  • Loading branch information
ozabalaferrera committed Jul 28, 2024
1 parent a3e1865 commit 11fb179
Show file tree
Hide file tree
Showing 17 changed files with 478 additions and 60 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ name = "cloudevents"

[features]
http-binding = ["async-trait", "bytes", "futures", "http"]
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http"]
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http-0-2"]
reqwest = ["reqwest-lib", "async-trait", "bytes", "http", "uuid/js"]
rdkafka = ["rdkafka-lib", "bytes", "futures"]
warp = ["warp-lib", "bytes", "http", "http-body-util", "hyper"]
warp = ["warp-lib", "bytes", "http-0-2", "http-body-util", "hyper-0-14"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body-util", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait", "http-body-util", "futures"]
nats = ["nats-lib"]
Expand All @@ -52,8 +52,8 @@ async-trait = { version = "^0.1.33", optional = true }
bytes = { version = "^1.0", optional = true }
futures = { version = "^0.3", optional = true, features = ["compat"]}
http = { version = "1.1", optional = true}
http-0-2 = { version = "0.2", optional = true, package = "http"}
axum-lib = { version = "^0.7", optional = true, package="axum"}
http-body = { version = "^0.4", optional = true }
http-body-util = {version = "^0.1", optional = true}
poem-lib = { version = "=3.0.1", optional = true, package = "poem" }
nats-lib = { version = "0.25.0", optional = true, package = "nats" }
Expand All @@ -66,6 +66,7 @@ web-sys = { version = "^0.3", features = ["Window", "Location"] }

[target.'cfg(not(target_os = "wasi"))'.dependencies]
hyper = { version = "^1.3", optional = true, package="hyper" }
hyper-0-14 = { version = "^0.14", optional = true, package = "hyper"}

[target.'cfg(all(target_arch = "wasm32", target_os = "wasi"))'.dependencies]
hyper_wasi = { version = "0.15", features = ["full"], optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/binding/actix/server_request.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::binding::http::{to_event, Headers};
use crate::binding::http_0_2::{to_event, Headers};
use crate::Event;
use actix_web::dev::Payload;
use actix_web::web::BytesMut;
use actix_web::{web, HttpRequest};
use async_trait::async_trait;
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use http::header::{AsHeaderName, HeaderName, HeaderValue};
use http;
use http_0_2 as http;

/// Implement Headers for the actix HeaderMap
impl<'a> Headers<'a> for actix_http::header::HeaderMap {
Expand Down
4 changes: 2 additions & 2 deletions src/binding/actix/server_response.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::binding::http::{Builder, Serializer};
use crate::binding::http_0_2::{Builder, Serializer};
use crate::message::{BinaryDeserializer, Result};
use crate::Event;
use actix_web::http::StatusCode;
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder};
use http;
use http_0_2 as http;

impl Builder<HttpResponse> for HttpResponseBuilder {
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
Expand Down
2 changes: 0 additions & 2 deletions src/binding/axum/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ where

#[cfg(test)]
mod tests {
use axum_lib;

use super::*;
use axum::body::Body;
use axum::extract::FromRequest;
Expand Down
1 change: 0 additions & 1 deletion src/binding/http/builder/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
use hyper::body::Body;
use std::cell::Cell;

use crate::binding::http::{Builder, Serializer};
Expand Down
2 changes: 1 addition & 1 deletion src/binding/http/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(any(feature = "hyper", feature = "hyper-1-3"))]
#[cfg(feature = "hyper")]
pub mod adapter;

use crate::message::Result;
Expand Down
2 changes: 1 addition & 1 deletion src/binding/http/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
use crate::test::fixtures;
use bytes::Bytes;
use http::Request;

use http;
use std::convert::TryFrom;

Expand Down
38 changes: 38 additions & 0 deletions src/binding/http_0_2/builder/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use http::Response;
use http_0_2 as http;
use hyper::body::Body;
use hyper_0_14 as hyper;
use std::cell::Cell;

use crate::binding::http_0_2::{Builder, Serializer};
use crate::message::{BinaryDeserializer, Error, Result};
use crate::Event;

struct Adapter {
builder: Cell<http::response::Builder>,
}

impl Builder<Response<Body>> for Adapter {
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
self.builder.set(self.builder.take().header(key, value));
}
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
self.builder.take().body(Body::from(bytes)).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
}
})
}
fn finish(&mut self) -> Result<Response<Body>> {
self.body(Vec::new())
}
}

pub fn to_response(event: Event) -> std::result::Result<Response<Body>, Error> {
BinaryDeserializer::deserialize_binary(
event,
Serializer::new(Adapter {
builder: Cell::new(http::Response::builder()),
}),
)
}
11 changes: 11 additions & 0 deletions src/binding/http_0_2/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[cfg(feature = "hyper-0-14")]
pub mod adapter;

use crate::message::Result;
use http_0_2 as http;

pub trait Builder<R> {
fn header(&mut self, key: &str, value: http::header::HeaderValue);
fn body(&mut self, bytes: Vec<u8>) -> Result<R>;
fn finish(&mut self) -> Result<R>;
}
114 changes: 114 additions & 0 deletions src/binding/http_0_2/deserializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use super::{Headers, SPEC_VERSION_HEADER};
use crate::{
binding::CLOUDEVENTS_JSON_HEADER,
event::SpecVersion,
header_value_to_str, message,
message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer,
StructuredSerializer,
},
};
use http_0_2 as http;
use std::convert::TryFrom;

pub struct Deserializer<'a, T: Headers<'a>> {
headers: &'a T,
body: Vec<u8>,
}

impl<'a, T: Headers<'a>> Deserializer<'a, T> {
pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
Deserializer { headers, body }
}
}

impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
self,
mut visitor: V,
) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}

let spec_version = SpecVersion::try_from(
self.headers
.get(SPEC_VERSION_HEADER)
.map(|a| header_value_to_str!(a))
.unwrap()?,
)?;

let attributes = spec_version.attribute_names();

visitor = visitor.set_spec_version(spec_version)?;

for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
let key = hn.as_str();
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
}) {
let name = &hn.as_str()["ce-".len()..];

if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
)?
}
}

if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(
header_value_to_str!(hv)?,
)),
)?
}

if !self.body.is_empty() {
visitor.end_with_data(self.body)
} else {
visitor.end()
}
}
}

impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
self,
visitor: V,
) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body)
}
}

impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
fn encoding(&self) -> Encoding {
if self
.headers
.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
.is_some()
{
Encoding::STRUCTURED
} else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
Encoding::BINARY
} else {
Encoding::UNKNOWN
}
}
}
22 changes: 22 additions & 0 deletions src/binding/http_0_2/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
use http_0_2 as http;

/// Any http library should be able to use the
/// [`to_event`](super::to_event) function with an implementation of
/// this trait.
pub trait Headers<'a> {
type Iterator: Iterator<Item = (&'a HeaderName, &'a HeaderValue)>;
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue>;
fn iter(&'a self) -> Self::Iterator;
}

/// Implemention for the HeaderMap used by warp/reqwest
impl<'a> Headers<'a> for HeaderMap<HeaderValue> {
type Iterator = http::header::Iter<'a, HeaderValue>;
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue> {
self.get(name)
}
fn iter(&'a self) -> Self::Iterator {
self.iter()
}
}
73 changes: 73 additions & 0 deletions src/binding/http_0_2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
pub mod builder;
pub mod deserializer;
mod headers;

use crate::{
message::{Error, MessageDeserializer},
Event,
};
use deserializer::Deserializer;
pub use headers::Headers;
mod serializer;

pub use builder::Builder;
use core::convert::TryFrom;
use http::Response;
use http_0_2 as http;
pub use serializer::Serializer;
use std::convert::TryInto;
use std::fmt::Debug;

pub static SPEC_VERSION_HEADER: &str = "ce-specversion";

/// Turn a pile of HTTP headers and a body into a CloudEvent
pub fn to_event<'a, T: Headers<'a>>(
headers: &'a T,
body: Vec<u8>,
) -> std::result::Result<Event, Error> {
MessageDeserializer::into_event(Deserializer::new(headers, body))
}

pub fn header_prefix(name: &str) -> String {
super::header_prefix("ce-", name)
}

impl<T> TryFrom<Response<T>> for Event
where
T: TryInto<Vec<u8>>,
<T as TryInto<Vec<u8>>>::Error: Debug,
{
type Error = crate::message::Error;

fn try_from(response: Response<T>) -> Result<Self, Self::Error> {
let headers = response.headers().to_owned();
let body = T::try_into(response.into_body()).unwrap();

to_event(&headers, body)
}
}

#[cfg(test)]
mod tests {
use crate::test::fixtures;
use crate::Event;
use core::convert::TryFrom;
use http::Response;
use http_0_2 as http;

#[test]
fn test_response_to_event() {
let event = fixtures::v10::minimal_string_extension();

let response = Response::builder()
.header("ce-id", fixtures::id())
.header("ce-source", fixtures::source())
.header("ce-type", fixtures::ty())
.header("ce-specversion", "1.0")
.header("ce-someint", "10")
.body(Vec::new())
.unwrap();

assert_eq!(event, Event::try_from(response).unwrap());
}
}
Loading

0 comments on commit 11fb179

Please sign in to comment.