Skip to content

Commit

Permalink
add opentelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
timzaak committed Jul 16, 2024
1 parent e670785 commit cf86e00
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 51 deletions.
17 changes: 4 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ It provides a static web http server with cache and hot reload.
- Provide command line/npm package to deploy spa.
- Multiple configs for different domain.
- support Let's Encrypt
- support OpenTelemetry Trace
- provide JS SDK and command line client to interact with Server

## Document
Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- 提供 命令行/npm包 客户端,一行命令部署
- 每个域名可拥有独立的配置
- 支持 Let's Encrypt
- 支持 OpenTelemetry
- 提供JS SDK、命令行客户端与服务器进行交互。

## 文档
Expand Down
5 changes: 2 additions & 3 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ include = ["src/**/*", "Cargo.toml"]

[[bin]]
name = "spa-server"
path = "src/bin/main.rs"
path = "src/main.rs"

[dependencies]
entity = { path = "../entity" }
Expand Down Expand Up @@ -51,8 +51,7 @@ tracing-opentelemetry = "0.24"
opentelemetry = { version = "0.23", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.23", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.16", features = ["default", "metrics"] }
opentelemetry-stdout = { version = "0.4", features = ["trace"] }
opentelemetry-resource-detectors = { version = "0.2" }
#opentelemetry-resource-detectors = { version = "0.2" }
opentelemetry-semantic-conventions = { version = "0.15" }

# tokio cron
Expand Down
3 changes: 2 additions & 1 deletion server/examples/try_opentelemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async fn foo() {
}
*/

/*
use opentelemetry::{global, Key, KeyValue};
use opentelemetry_otlp::{ExportConfig, Protocol, TonicExporterBuilder, WithExportConfig};
use opentelemetry_resource_detectors::{OsResourceDetector, ProcessResourceDetector};
Expand Down Expand Up @@ -314,3 +314,4 @@ async fn test_instrument() {
info!(histogram.baz = 10, "histogram example",);
}
*/
20 changes: 0 additions & 20 deletions server/src/bin/main.rs

This file was deleted.

2 changes: 1 addition & 1 deletion server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct Config {

#[derive(Deserialize, Debug, Clone, PartialEq)]
pub struct OpenTelemetry {
endpoint: String,
pub endpoint: String,
}

//TODO: create config with lots of default value
Expand Down
1 change: 0 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub mod tls;

mod acme;
pub mod cors;
mod otlp;
pub mod service;
pub mod static_file_filter;

Expand Down
27 changes: 27 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
mod otlp;

use tracing_core::Level;
use tracing_subscriber::EnvFilter;
use spa_server::config::Config;


#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Config::load()?;
let open_telemetry = &config.open_telemetry;
if let Some(open_telemetry) = open_telemetry.as_ref() {
otlp::init_otlp(open_telemetry.endpoint.clone())?;
} else {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(Level::INFO.into())
.from_env_lossy(),
)
.init();
}


tracing::debug!("config load:{:?}", &config);
spa_server::run_server_with_config(config).await
}
38 changes: 38 additions & 0 deletions server/src/otlp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use opentelemetry::KeyValue;
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use opentelemetry_sdk::{Resource, runtime};
use opentelemetry_sdk::trace::config;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use opentelemetry_semantic_conventions::trace::SERVICE_VERSION;
use tracing_core::Level;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

pub fn init_otlp(endpoint:String) -> anyhow::Result<()>{
let resource = Resource::new(vec![
KeyValue::new(SERVICE_NAME, "spa-server"),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION"))
]);
let mut export_config = ExportConfig::default();
export_config.endpoint = endpoint;
let exporter = opentelemetry_otlp::new_exporter().tonic()
.with_export_config(export_config);

let tracer = opentelemetry_otlp::new_pipeline()
.tracing().with_exporter(exporter)
.with_trace_config(config().with_resource(resource))
.install_batch(runtime::Tokio)?;

let filter = EnvFilter::builder()
.with_default_directive(Level::INFO.into())
.from_env_lossy();
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer().compact())
.with(OpenTelemetryLayer::new(tracer))
.init();
Ok(())

}
7 changes: 4 additions & 3 deletions server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::convert::Infallible;
use std::str::FromStr;
use std::sync::Arc;
use tracing::warn;
use tracing::{instrument, warn};
use warp::fs::{ArcPath, Conditionals};
use warp::http::Uri;
use warp::Reply;
Expand Down Expand Up @@ -55,6 +55,7 @@ fn alias_redirect(uri: &Uri, https: bool, host:&str, external_port:u16) -> warp:
}

// static file reply
#[instrument(skip(uri,host,domain_storage,origin_opt))]
async fn file_resp(req: &Request<Body>,uri:&Uri, host:&str, domain_storage: Arc<DomainStorage>, origin_opt: Option<Validated>) -> Result<Response<Body>, Infallible> {
let path = uri.path();
let mut resp = match get_cache_file(path, host, domain_storage.clone()).await {
Expand Down Expand Up @@ -109,7 +110,7 @@ fn get_authority(req:&Request<Body>) -> Option<Authority> {
})
}


#[instrument(skip(service_config, domain_storage,challenge_path, external_port))]
pub async fn create_http_service(
req: Request<Body>,
service_config: Arc<ServiceConfig>,
Expand Down Expand Up @@ -174,7 +175,7 @@ pub async fn create_http_service(

}


#[instrument(skip(service_config, domain_storage, external_port))]
pub async fn create_https_service(
req: Request<Body>,
service_config: Arc<ServiceConfig>,
Expand Down
10 changes: 5 additions & 5 deletions server/src/static_file_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::path::Path;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io;
use tracing::debug;
use tracing::{debug, trace};
use warp::fs::{file_stream, optimal_buf_size, Cond, Conditionals};
use warp::http::{Response, StatusCode};

Expand Down Expand Up @@ -202,15 +202,15 @@ pub async fn cache_or_file_reply(
//false,false => cache without content-encoding
//false, true => file
if !client_accept_gzip && compressed {
debug!("{} hit disk", key);
trace!("{} hit disk", key);
Ok(file_reply(&item, path.as_ref(), range, modified).await)
} else {
let mut resp = cache_reply(item.as_ref(), bytes, range, modified);
if client_accept_gzip && compressed {
debug!("{} hit cache, compressed", key);
trace!("{} hit cache, compressed", key);
resp.headers_mut().typed_insert(ContentEncoding::gzip());
} else {
debug!("{} hit cache", key);
trace!("{} hit cache", key);
}
Ok(resp)
}
Expand Down Expand Up @@ -255,7 +255,7 @@ pub async fn get_cache_file(
) -> Option<(String, Arc<CacheItem>)> {
let _key = sanitize_path(tail)?;
let key = _key[1..].to_owned();
debug!("get file: {host}, tail:{_key}, fixed: {key}");
//debug!("get file: {host}, tail:{_key}, fixed: {key}");
if let Some(cache_item) = domain_storage.get_file(host, &key) {
Some((key, cache_item))
} else {
Expand Down
5 changes: 5 additions & 0 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,10 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1.0.56"
console-subscriber = "0.3"
opentelemetry-stdout = { version = "0.4", features = ["trace"] }
tracing-opentelemetry = "0.24"
opentelemetry = { version = "0.23", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.23", features = ["rt-tokio", "metrics"] }

#rustls = "0.21.12" # from reqwest
#rustls-pemfile = "2.1.2"
1 change: 1 addition & 0 deletions tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async fn main() -> Result<()> {
"SPA_CONFIG",
get_test_dir().join("server_config.toml").display().to_string(),
);

tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
Expand Down
28 changes: 24 additions & 4 deletions tests/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ use spa_client::api::API;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::{env, fs, io};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::TracerProvider;
use opentelemetry_stdout::SpanExporter;
//use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;


pub const LOCAL_HOST: &str = "local.fornetcode.com";
pub const LOCAL_HOST2: &str = "local2.fornetcode.com";
Expand Down Expand Up @@ -67,13 +73,27 @@ pub fn run_server_with_config(config_file_name: &str) -> JoinHandle<()> {
"SPA_CONFIG",
get_test_dir().join(config_file_name).display().to_string(),
);
let _ = tracing_subscriber::fmt()
.with_env_filter(
let provider = TracerProvider::builder()
.with_simple_exporter(SpanExporter::default())
.build();
let tracer = provider.tracer("spa-server");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

let _ = tracing_subscriber::registry()
.with(
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,spa_server=debug,spa_client=debug".into()),
)
.with_test_writer()
.try_init();
.with(tracing_subscriber::fmt::layer().compact())
.with(telemetry).try_init();

// let _ = tracing_subscriber::fmt()
// .with_env_filter(
// EnvFilter::try_from_default_env()
// .unwrap_or_else(|_| "info,spa_server=debug,spa_client=debug".into()),
// )
// .with_test_writer()
// .try_init();
tokio::spawn(async move {
let result = spa_server::run_server().await;
if result.is_err() {
Expand Down

0 comments on commit cf86e00

Please sign in to comment.