Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed May 24, 2024
1 parent 5c7c695 commit 106ce5d
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 22 deletions.
1 change: 0 additions & 1 deletion opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ fn get_span_data() -> Vec<SpanData> {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/benches/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl SpanExporter for NoopExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {}
}

#[cfg(not(target_os = "windows"))]
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/benches/span_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl SpanExporter for NoopExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {}
}

const MAP_KEYS: [&str; 64] = [
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/benches/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ impl SpanExporter for VoidExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {}
}

fn trace_benchmark_group<F: Fn(&sdktrace::Tracer)>(c: &mut Criterion, name: &str, f: F) {
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub trait SpanExporter: Send + Sync + Debug {
fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> {
Box::pin(async { Ok(()) })
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource);
}

/// `SpanData` contains all the information collected by a `Span` and can be used
Expand Down Expand Up @@ -92,8 +95,6 @@ pub struct SpanData {
pub links: crate::trace::SpanLinks,
/// Span status
pub status: Status,
/// Resource contains attributes representing an entity that produced this span.
pub resource: Cow<'static, Resource>,
/// Instrumentation library that produced this span
pub instrumentation_lib: crate::InstrumentationLibrary,
}
8 changes: 8 additions & 0 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::resource::Resource;
use futures_util::future::BoxFuture;
use opentelemetry::trace::{TraceError, TraceResult};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -51,6 +52,7 @@ use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct InMemorySpanExporter {
spans: Arc<Mutex<Vec<SpanData>>>,
resource: Arc<Mutex<Resource>>,
}

impl Default for InMemorySpanExporter {
Expand Down Expand Up @@ -85,6 +87,7 @@ impl InMemorySpanExporterBuilder {
pub fn build(&self) -> InMemorySpanExporter {
InMemorySpanExporter {
spans: Arc::new(Mutex::new(Vec::new())),
resource: Arc::new(Mutex::new(Resource::default())),
}
}
}
Expand Down Expand Up @@ -142,4 +145,9 @@ impl SpanExporter for InMemorySpanExporter {
fn shutdown(&mut self) {
self.reset()
}

fn set_resource(&mut self, resource: &Resource) {
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
*res_guard = resource.clone();
}
}
10 changes: 7 additions & 3 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::{Config, SpanEvents, SpanLinks},
resource::Resource,
trace::{SpanEvents, SpanLinks},
InstrumentationLibrary,
};
use futures_util::future::BoxFuture;
Expand All @@ -14,7 +15,6 @@ use opentelemetry::trace::{
use std::fmt::{Display, Formatter};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
SpanData {
span_context: SpanContext::new(
TraceId::from_u128(1),
Expand All @@ -33,7 +33,6 @@ pub fn new_test_export_span_data() -> SpanData {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: config.resource,
instrumentation_lib: InstrumentationLibrary::default(),
}
}
Expand Down Expand Up @@ -61,6 +60,8 @@ impl SpanExporter for TokioSpanExporter {
fn shutdown(&mut self) {
self.tx_shutdown.send(()).unwrap();
}

fn set_resource(&mut self, _resource: &crate::Resource) {}
}

pub fn new_tokio_test_exporter() -> (
Expand Down Expand Up @@ -121,4 +122,7 @@ impl SpanExporter for NoopSpanExporter {
fn export(&mut self, _: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(std::future::ready(Ok(())))
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}
17 changes: 13 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ impl Builder {
}
}

// Create a new vector to hold the modified processors
let mut processors = self.processors;

// Set the resource for each processor
for p in &mut processors {
p.set_resource(config.resource.as_ref());
}

TracerProvider {
inner: Arc::new(TracerProviderInner {
processors: self.processors,
config,
}),
inner: Arc::new(TracerProviderInner { processors, config }),
}
}
}
Expand Down Expand Up @@ -267,6 +272,10 @@ mod tests {
fn shutdown(&mut self) -> TraceResult<()> {
self.force_flush()
}

fn set_resource(&mut self, _: &Resource) {
unimplemented!()
}
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// need to run those tests one by one as the GlobalTracerProvider is a shared object between
// threads Use cargo test -- --ignored --test-threads=1 to run those tests.
use crate::export::trace::{ExportResult, SpanExporter};
use crate::resource::Resource;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::runtime;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
Expand All @@ -28,6 +29,8 @@ impl SpanExporter for SpanCountExporter {
self.span_count.fetch_add(batch.len(), Ordering::SeqCst);
Box::pin(async { Ok(()) })
}

fn set_resource(&mut self, _: &Resource) {}
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
Expand Down
10 changes: 2 additions & 8 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
//! is possible to change its name, set its `Attributes`, and add `Links` and `Events`.
//! These cannot be changed after the `Span`'s end time has been set.
use crate::trace::SpanLimits;
use crate::Resource;
use opentelemetry::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status};
use opentelemetry::KeyValue;
use std::borrow::Cow;
Expand Down Expand Up @@ -77,11 +76,11 @@ impl Span {
/// overhead.
pub fn exported_data(&self) -> Option<crate::export::trace::SpanData> {
let (span_context, tracer) = (self.span_context.clone(), &self.tracer);
let resource = self.tracer.provider()?.config().resource.clone();
//let resource = self.tracer.provider()?.config().resource.clone();

self.data
.as_ref()
.map(|data| build_export_data(data.clone(), span_context, resource, tracer))
.map(|data| build_export_data(data.clone(), span_context, tracer))
}
}

Expand Down Expand Up @@ -225,17 +224,14 @@ impl Span {
processor.on_end(build_export_data(
data,
self.span_context.clone(),
provider.config().resource.clone(),
&self.tracer,
));
}
processors => {
let config = provider.config();
for processor in processors {
processor.on_end(build_export_data(
data.clone(),
self.span_context.clone(),
config.resource.clone(),
&self.tracer,
));
}
Expand All @@ -254,7 +250,6 @@ impl Drop for Span {
fn build_export_data(
data: SpanData,
span_context: SpanContext,
resource: Cow<'static, Resource>,
tracer: &crate::trace::Tracer,
) -> crate::export::trace::SpanData {
crate::export::trace::SpanData {
Expand All @@ -269,7 +264,6 @@ fn build_export_data(
events: data.events,
links: data.links,
status: data.status,
resource,
instrumentation_lib: tracer.instrumentation_library().clone(),
}
}
Expand Down
29 changes: 26 additions & 3 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
//! [`TracerProvider`]: opentelemetry::trace::TracerProvider
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::resource::Resource;
use crate::runtime::{RuntimeChannel, TrySend};
use crate::trace::Span;
use futures_channel::oneshot;
Expand All @@ -50,7 +51,7 @@ use opentelemetry::{
Context,
};
use std::cmp::min;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::{env, fmt, str::FromStr, time::Duration};

/// Delay interval between two consecutive exports.
Expand Down Expand Up @@ -92,6 +93,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processors to do any cleanup required.
fn shutdown(&mut self) -> TraceResult<()>;
/// Set the resource for the log processor.
fn set_resource(&mut self, _resource: &Resource);
}

/// A [SpanProcessor] that passes finished spans to the configured
Expand Down Expand Up @@ -147,6 +150,12 @@ impl SpanProcessor for SimpleSpanProcessor {
))
}
}

fn set_resource(&mut self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
}
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
Expand Down Expand Up @@ -259,6 +268,13 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}

fn set_resource(&mut self, resource: &Resource) {
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
}
}

/// Messages sent between application thread and batch span processor's work thread.
Expand All @@ -275,6 +291,8 @@ enum BatchMessage {
Flush(Option<oneshot::Sender<ExportResult>>),
/// Shut down the worker thread, push all spans in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
/// Set the resource for the exporter.
SetResource(Arc<Resource>),
}

struct BatchSpanProcessorInternal<R> {
Expand Down Expand Up @@ -375,8 +393,11 @@ impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
self.exporter.shutdown();
return false;
}
// propagate the resource
BatchMessage::SetResource(resource) => {
self.exporter.set_resource(&resource);
}
}

true
}

Expand Down Expand Up @@ -669,6 +690,7 @@ mod tests {
OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
};
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::resource::Resource;
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder,
Expand Down Expand Up @@ -710,7 +732,6 @@ mod tests {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Default::default(),
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
Expand Down Expand Up @@ -928,6 +949,8 @@ mod tests {
use futures_util::FutureExt;
Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(())))
}

fn set_resource(&mut self, _resource: &Resource) {}
}

#[test]
Expand Down

0 comments on commit 106ce5d

Please sign in to comment.