Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the example installing OpenTelemetryLayer into a global subscriber #175

Open
wants to merge 4 commits into
base: v0.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions examples/opentelemetry-otlp.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use opentelemetry::{global, trace::TracerProvider, Key, KeyValue};
use opentelemetry::{global, trace::TracerProvider as _, Key, KeyValue};
use opentelemetry_sdk::{
metrics::{
reader::DefaultTemporalitySelector, Aggregation, Instrument, MeterProviderBuilder,
PeriodicReader, SdkMeterProvider, Stream,
},
runtime,
trace::{BatchConfig, RandomIdGenerator, Sampler, Tracer},
trace::{BatchConfig, RandomIdGenerator, Sampler, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
Expand Down Expand Up @@ -88,9 +88,9 @@ fn init_meter_provider() -> SdkMeterProvider {
meter_provider
}

// Construct Tracer for OpenTelemetryLayer
fn init_tracer() -> Tracer {
let provider = opentelemetry_otlp::new_pipeline()
// Construct TracerProvider for OpenTelemetryLayer
fn init_tracer_provider() -> TracerProvider {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
Expand All @@ -105,18 +105,24 @@ fn init_tracer() -> Tracer {
.with_batch_config(BatchConfig::default())
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(runtime::Tokio)
.unwrap();

global::set_tracer_provider(provider.clone());
provider.tracer("tracing-otel-subscriber")
.unwrap()
}

// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
fn init_tracing_subscriber() -> OtelGuard {
let tracer_provider = init_tracer_provider();
let meter_provider = init_meter_provider();
let tracer = init_tracer();

let tracer = tracer_provider.tracer("tracing-otel-subscriber");

tracing_subscriber::registry()
// The global level filter prevents the exporter network stack
// from reentering the globally installed OpenTelemetryLayer with
// its own spans while exporting, as the libraries should not use
// tracing levels below DEBUG. If the OpenTelemetry layer needs to
// trace spans and events with higher verbosity levels, consider using
// per-layer filtering to target the telemetry layer specifically,
// e.g. by target matching.
.with(tracing_subscriber::filter::LevelFilter::from_level(
Level::INFO,
))
Expand All @@ -125,19 +131,25 @@ fn init_tracing_subscriber() -> OtelGuard {
.with(OpenTelemetryLayer::new(tracer))
.init();

OtelGuard { meter_provider }
OtelGuard {
tracer_provider,
meter_provider,
}
}

struct OtelGuard {
tracer_provider: TracerProvider,
meter_provider: SdkMeterProvider,
}

impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
opentelemetry::global::shutdown_tracer_provider();
}
}

Expand Down
85 changes: 85 additions & 0 deletions tests/batch_global_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use futures_util::future::BoxFuture;
use opentelemetry::{global as otel_global, trace::TracerProvider as _};
use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
runtime,
trace::TracerProvider,
};
use tokio::runtime::Runtime;
use tracing::{info_span, subscriber, Level, Subscriber};
use tracing_opentelemetry::layer;
use tracing_subscriber::filter;
use tracing_subscriber::prelude::*;

use std::sync::{Arc, Mutex};

#[derive(Clone, Debug, Default)]
struct TestExporter(Arc<Mutex<Vec<SpanData>>>);

impl SpanExporter for TestExporter {
fn export(&mut self, mut batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let spans = self.0.clone();
Box::pin(async move {
if let Ok(mut inner) = spans.lock() {
inner.append(&mut batch);
}
Ok(())
})
}
}

fn test_tracer(runtime: &Runtime) -> (TracerProvider, TestExporter, impl Subscriber) {
let _guard = runtime.enter();

let exporter = TestExporter::default();
let provider = TracerProvider::builder()
.with_batch_exporter(exporter.clone(), runtime::Tokio)
.build();
let tracer = provider.tracer("test");

let subscriber = tracing_subscriber::registry().with(
layer()
.with_tracer(tracer)
.with_filter(filter::Targets::new().with_target("test_telemetry", Level::INFO)),
);

(provider, exporter, subscriber)
}

#[test]
fn shutdown_in_scope() {
let rt = Runtime::new().unwrap();
let (provider, exporter, subscriber) = test_tracer(&rt);

subscriber::set_global_default(subscriber).unwrap();

for _ in 0..1000 {
let _span = info_span!(target: "test_telemetry", "test_span").entered();
}

// Should flush all batched telemetry spans
provider.shutdown().unwrap();

let spans = exporter.0.lock().unwrap();
assert_eq!(spans.len(), 1000);
}

#[test]
#[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/1961
fn shutdown_global() {
let rt = Runtime::new().unwrap();
let (provider, exporter, subscriber) = test_tracer(&rt);

otel_global::set_tracer_provider(provider);
subscriber::set_global_default(subscriber).unwrap();

for _ in 0..1000 {
let _span = info_span!(target: "test_telemetry", "test_span").entered();
}

// Should flush all batched telemetry spans
otel_global::shutdown_tracer_provider();

let spans = exporter.0.lock().unwrap();
assert_eq!(spans.len(), 1000);
}
Loading