Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
cleanup and upgrade deps (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Apr 15, 2024
1 parent 92018ab commit 56d4615
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 317 deletions.
545 changes: 292 additions & 253 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 7 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,27 @@ members = [
"hook-janitor",
]

# [profile.release]
# debug = 2 # https://www.polarsignals.com/docs/rust

[workspace.dependencies]
anyhow = "1.0"
assert-json-diff = "2.0.2"
async-trait = "0.1.74"
axum = { version = "0.7.1", features = ["http2"] }
axum-client-ip = "0.4.1"
base64 = "0.21.1"
axum = { version = "0.7.5", features = ["http2", "macros"] }
axum-client-ip = "0.6.0"
base64 = "0.22.0"
bytes = "1"
chrono = { version = "0.4" }
envconfig = "0.10.0"
eyre = "0.6.9"
flate2 = "1.0"
futures = { version = "0.3.29" }
governor = { version = "0.5.1", features = ["dashmap"] }
http = { version = "0.2" }
http = { version = "1.1.0" }
http-body-util = "0.1.0"
metrics = "0.22.0"
metrics-exporter-prometheus = "0.13.0"
metrics-exporter-prometheus = "0.14.0"
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
regex = "1.10.2"
reqwest = { version = "0.11" }
reqwest = { version = "0.12.3" }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand All @@ -57,8 +53,7 @@ time = { version = "0.3.20", features = [
] }
tokio = { version = "1.34.0", features = ["full"] }
tower = "0.4.13"
tower_governor = "0.0.4"
tower-http = { version = "0.4.0", features = ["cors", "trace"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
url = { version = "2.5.0 " }
Expand Down
14 changes: 6 additions & 8 deletions capture-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum = { workspace = true }
capture = { path = "../capture" }
envconfig = { workspace = true }
opentelemetry = { version = "0.21.0", features = ["trace"]}
opentelemetry-otlp = "0.14.0"
opentelemetry_sdk = { version = "0.21.0", features = ["trace", "rt-tokio"] }
time = { workspace = true }
opentelemetry = { version = "0.22.0", features = ["trace"]}
opentelemetry-otlp = "0.15.0"
opentelemetry_sdk = { version = "0.22.1", features = ["trace", "rt-tokio"] }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = "0.22.0"
tracing-opentelemetry = "0.23.0"
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[dev-dependencies]
Expand All @@ -23,5 +21,5 @@ futures = "0.3.29"
once_cell = "1.18.0"
rand = { workspace = true }
rdkafka = { workspace = true }
reqwest = "0.11.22"
serde_json = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
5 changes: 3 additions & 2 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::net::TcpListener;
use std::time::Duration;

use envconfig::Envconfig;
Expand Down Expand Up @@ -76,6 +75,8 @@ async fn main() {
.init();

// Open the TCP port and start the server
let listener = TcpListener::bind(config.address).unwrap();
let listener = tokio::net::TcpListener::bind(config.address)
.await
.expect("could not bind port");
serve(config, listener, shutdown()).await
}
17 changes: 9 additions & 8 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]

use std::default::Default;
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::string::ToString;
Expand All @@ -17,6 +17,7 @@ use rdkafka::config::{ClientConfig, FromClientConfig};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::time::timeout;
use tracing::{debug, warn};
Expand Down Expand Up @@ -59,20 +60,20 @@ pub struct ServerHandle {
}

impl ServerHandle {
pub fn for_topic(topic: &EphemeralTopic) -> Self {
pub async fn for_topic(topic: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
Self::for_config(config)
Self::for_config(config).await
}
pub fn for_config(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
pub async fn for_config(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let notify = Arc::new(Notify::new());
let shutdown = notify.clone();

tokio::spawn(
async move { serve(config, listener, async { notify.notified().await }).await },
);
tokio::spawn(async move {
serve(config, listener, async move { notify.notified().await }).await
});
Self { addr, shutdown }
}

Expand Down
10 changes: 5 additions & 5 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn it_captures_one_event() -> Result<()> {
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!({
"token": token,
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn it_captures_a_batch() -> Result<()> {
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -90,7 +90,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -176,7 +176,7 @@ async fn it_trims_distinct_id() -> Result<()> {
let (trimmed_distinct_id2, _) = distinct_id2.split_at(200); // works because ascii chars

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!([{
"token": token,
Expand Down
9 changes: 2 additions & 7 deletions capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = { version = "0.6.15" } # TODO: Bring up to date with the workspace.
axum = { workspace = true }
axum-client-ip = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
governor = { workspace = true }
tower_governor = { workspace = true }
time = { workspace = true }
tower-http = { workspace = true }
bytes = { workspace = true }
Expand All @@ -35,10 +33,7 @@ redis = { version = "0.23.3", features = [
"cluster-async",
] }
envconfig = { workspace = true }
dashmap = "5.5.3"

[dev-dependencies]
assert-json-diff = { workspace = true }
axum-test-helper = "0.2.0"
mockall = "0.11.2"
redis-test = "0.2.3"
axum-test-helper = { git = "https://github.com/orphan-rs/axum-test-helper.git" } # TODO: remove, directly use reqwest like capture-server tests
3 changes: 2 additions & 1 deletion capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use bytes::Bytes;

use axum::Json;
use axum::{debug_handler, Json};
// TODO: stream this instead
use axum::extract::{Query, State};
use axum::http::{HeaderMap, Method};
Expand Down Expand Up @@ -38,6 +38,7 @@ use crate::{
compression
)
)]
#[debug_handler]
pub async fn event(
state: State<router::State>,
InsecureClientIp(ip): InsecureClientIp,
Expand Down
3 changes: 2 additions & 1 deletion capture/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::time::Instant;

use axum::body::Body;
use axum::{extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse};
use metrics::counter;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
Expand Down Expand Up @@ -38,7 +39,7 @@ pub fn setup_metrics_recorder() -> PrometheusHandle {
/// Middleware to record some common HTTP metrics
/// Generic over B to allow for arbitrary body types (eg Vec<u8>, Streams, a deserialized thing, etc)
/// Someday tower-http might provide a metrics middleware: https://github.com/tower-rs/tower-http/issues/57
pub async fn track_metrics<B>(req: Request<B>, next: Next<B>) -> impl IntoResponse {
pub async fn track_metrics(req: Request<Body>, next: Next) -> impl IntoResponse {
let start = Instant::now();

let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
Expand Down
18 changes: 10 additions & 8 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::future::Future;
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use std::sync::Arc;

use time::Duration;
use tokio::net::TcpListener;

use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
Expand All @@ -15,7 +16,7 @@ use crate::sinks::print::PrintSink;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
F: Future<Output = ()>,
F: Future<Output = ()> + Send + 'static,
{
let liveness = HealthRegistry::new("liveness");

Expand Down Expand Up @@ -80,10 +81,11 @@ where
// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`
tracing::info!("listening on {:?}", listener.local_addr().unwrap());
axum::Server::from_tcp(listener)
.unwrap()
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(shutdown)
.await
.unwrap()
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(shutdown)
.await
.unwrap()
}
1 change: 0 additions & 1 deletion hook-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ eyre = { workspace = true }
hook-common = { path = "../hook-common" }
http-body-util = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions hook-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ http = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
reqwest = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
time = { workspace = true }
Expand Down
6 changes: 0 additions & 6 deletions hook-janitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,12 @@ envconfig = { workspace = true }
eyre = { workspace = true }
futures = { workspace = true }
hook-common = { path = "../hook-common" }
http-body-util = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
rdkafka = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
4 changes: 1 addition & 3 deletions hook-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ chrono = { workspace = true }
envconfig = { workspace = true }
futures = "0.3"
hook-common = { path = "../hook-common" }
http = { version = "0.2" }
http = { workspace = true }
metrics = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
sqlx = { workspace = true }
time = { workspace = true }
thiserror = { workspace = true }
Expand Down

0 comments on commit 56d4615

Please sign in to comment.