Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
upgrade capture's axum
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Apr 15, 2024
1 parent 5970846 commit 7398863
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 74 deletions.
187 changes: 140 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ members = [
anyhow = "1.0"
assert-json-diff = "2.0.2"
async-trait = "0.1.74"
axum = { version = "0.7.1", features = ["http2"] }
axum-client-ip = "0.4.1"
axum = { version = "0.7.5", features = ["http2", "macros"] }
axum-client-ip = "0.6.0"
base64 = "0.21.1"
bytes = "1"
chrono = { version = "0.4" }
Expand Down Expand Up @@ -56,7 +56,7 @@ time = { version = "0.3.20", features = [
] }
tokio = { version = "1.34.0", features = ["full"] }
tower = "0.4.13"
tower-http = { version = "0.4.0", features = ["cors", "trace"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
url = { version = "2.5.0 " }
Expand Down
2 changes: 1 addition & 1 deletion capture-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ futures = "0.3.29"
once_cell = "1.18.0"
rand = { workspace = true }
rdkafka = { workspace = true }
reqwest = "0.11.22"
reqwest = { workspace = true }
serde_json = { workspace = true }
3 changes: 1 addition & 2 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::net::TcpListener;
use std::time::Duration;

use envconfig::Envconfig;
Expand Down Expand Up @@ -76,6 +75,6 @@ async fn main() {
.init();

// Open the TCP port and start the server
let listener = TcpListener::bind(config.address).unwrap();
let listener = tokio::net::TcpListener::bind(config.address).await.expect("could not bind port");
serve(config, listener, shutdown()).await
}
13 changes: 7 additions & 6 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]

use std::default::Default;
use std::net::{SocketAddr, TcpListener};
use std::net::{SocketAddr};
use std::num::NonZeroU32;
use std::str::FromStr;
use std::string::ToString;
Expand All @@ -17,6 +17,7 @@ use rdkafka::config::{ClientConfig, FromClientConfig};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::time::timeout;
use tracing::{debug, warn};
Expand Down Expand Up @@ -59,19 +60,19 @@ pub struct ServerHandle {
}

impl ServerHandle {
pub fn for_topic(topic: &EphemeralTopic) -> Self {
pub async fn for_topic(topic: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
Self::for_config(config)
Self::for_config(config).await
}
pub fn for_config(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
pub async fn for_config(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let notify = Arc::new(Notify::new());
let shutdown = notify.clone();

tokio::spawn(
async move { serve(config, listener, async { notify.notified().await }).await },
async move { serve(config, listener, async move { notify.notified().await }).await },
);
Self { addr, shutdown }
}
Expand Down
10 changes: 5 additions & 5 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn it_captures_one_event() -> Result<()> {
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!({
"token": token,
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn it_captures_a_batch() -> Result<()> {
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -90,7 +90,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -176,7 +176,7 @@ async fn it_trims_distinct_id() -> Result<()> {
let (trimmed_distinct_id2, _) = distinct_id2.split_at(200); // works because ascii chars

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!([{
"token": token,
Expand Down
4 changes: 2 additions & 2 deletions capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = { version = "0.6.15" } # TODO: Bring up to date with the workspace.
axum = { workspace = true }
axum-client-ip = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down Expand Up @@ -36,4 +36,4 @@ envconfig = { workspace = true }

[dev-dependencies]
assert-json-diff = { workspace = true }
axum-test-helper = "0.2.0"
axum-test-helper = { git = "https://github.com/orphan-rs/axum-test-helper.git" } # TODO: remove, directly use reqwest like capture-server tests
3 changes: 2 additions & 1 deletion capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use bytes::Bytes;

use axum::Json;
use axum::{debug_handler, Json};
// TODO: stream this instead
use axum::extract::{Query, State};
use axum::http::{HeaderMap, Method};
Expand Down Expand Up @@ -38,6 +38,7 @@ use crate::{
compression
)
)]
#[debug_handler]
pub async fn event(
state: State<router::State>,
InsecureClientIp(ip): InsecureClientIp,
Expand Down
3 changes: 2 additions & 1 deletion capture/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::time::Instant;

use axum::{extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse};
use axum::body::Body;
use metrics::counter;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};

Expand Down Expand Up @@ -38,7 +39,7 @@ pub fn setup_metrics_recorder() -> PrometheusHandle {
/// Middleware to record some common HTTP metrics
/// Generic over B to allow for arbitrary body types (eg Vec<u8>, Streams, a deserialized thing, etc)
/// Someday tower-http might provide a metrics middleware: https://github.com/tower-rs/tower-http/issues/57
pub async fn track_metrics<B>(req: Request<B>, next: Next<B>) -> impl IntoResponse {
pub async fn track_metrics(req: Request<Body>, next: Next) -> impl IntoResponse {
let start = Instant::now();

let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
Expand Down
9 changes: 4 additions & 5 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::future::Future;
use std::net::{SocketAddr, TcpListener};
use std::net::{SocketAddr};
use std::sync::Arc;

use time::Duration;
use tokio::net::TcpListener;

use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
Expand All @@ -15,7 +16,7 @@ use crate::sinks::print::PrintSink;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
F: Future<Output = ()>,
F: Future<Output = ()> + Send + 'static,
{
let liveness = HealthRegistry::new("liveness");

Expand Down Expand Up @@ -80,9 +81,7 @@ where
// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`
tracing::info!("listening on {:?}", listener.local_addr().unwrap());
axum::Server::from_tcp(listener)
.unwrap()
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(shutdown)
.await
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion hook-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ chrono = { workspace = true }
envconfig = { workspace = true }
futures = "0.3"
hook-common = { path = "../hook-common" }
http = { version = "0.2" }
http = { workspace = true }
metrics = { workspace = true }
reqwest = { workspace = true }
sqlx = { workspace = true }
Expand Down

0 comments on commit 7398863

Please sign in to comment.