diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index a5ced848d4..04d3e8f520 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -307,12 +307,16 @@ impl OtlpHttpClient { fn build_trace_export_body( &self, spans: Vec, + resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::trace::TraceResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; - let req = ExportTraceServiceRequest { - resource_spans: spans.into_iter().map(Into::into).collect(), - }; + let resource_spans = spans + .into_iter() + .map(|log_event| (log_event, resource).into()) + .collect::>(); + + let req = ExportTraceServiceRequest { resource_spans }; match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 8e272c93cf..8d6c3116cd 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient { Err(err) => return Box::pin(std::future::ready(Err(err))), }; - let (body, content_type) = match self.build_trace_export_body(batch) { + let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) { Ok(body) => body, Err(e) => return Box::pin(std::future::ready(Err(e))), }; @@ -66,4 +66,8 @@ impl SpanExporter for OtlpHttpClient { fn shutdown(&mut self) { let _ = self.client.lock().map(|mut c| c.take()); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 8a6637a5b0..6cefd611ff 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -12,7 +12,7 @@ use super::BoxInterceptor; pub(crate) struct TonicLogsClient { inner: Option, #[allow(dead_code)] - // would be removed once we support set_resource for metrics and traces. + // would be removed once we support set_resource for metrics. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index b328dfba5f..803d14626f 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -12,6 +12,9 @@ use super::BoxInterceptor; pub(crate) struct TonicTracesClient { inner: Option, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -43,6 +46,7 @@ impl TonicTracesClient { client, interceptor, }), + resource: Default::default(), } } } @@ -66,14 +70,21 @@ impl SpanExporter for TonicTracesClient { } }; + // TODO: Avoid cloning here. + let resource_spans = { + batch + .into_iter() + .map(|log_data| (log_data, &self.resource)) + .map(Into::into) + .collect() + }; + Box::pin(async move { client .export(Request::from_parts( metadata, extensions, - ExportTraceServiceRequest { - resource_spans: batch.into_iter().map(Into::into).collect(), - }, + ExportTraceServiceRequest { resource_spans }, )) .await .map_err(crate::Error::from)?; @@ -85,4 +96,8 @@ impl SpanExporter for TonicTracesClient { fn shutdown(&mut self) { let _ = self.inner.take(); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 4b658a89e2..34aafd4734 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -227,4 +227,8 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { self.0.export(batch) } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.0.set_resource(resource); + } } diff --git a/opentelemetry-proto/src/transform/trace.rs b/opentelemetry-proto/src/transform/trace.rs index 77f4f18de2..74ddc0cf32 100644 --- a/opentelemetry-proto/src/transform/trace.rs +++ b/opentelemetry-proto/src/transform/trace.rs @@ -4,7 +4,7 @@ pub mod tonic { use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status}; use crate::transform::common::{ to_nanos, - tonic::{resource_attributes, Attributes}, + tonic::{Attributes, ResourceAttributesWithSchema}, }; use opentelemetry::trace; use opentelemetry::trace::{Link, SpanId, SpanKind}; @@ -45,19 +45,15 @@ pub mod tonic { } } - impl From for ResourceSpans { - fn from(source_span: SpanData) -> Self { + impl From<(SpanData, &ResourceAttributesWithSchema)> for ResourceSpans { + fn from((source_span, resource): (SpanData, &ResourceAttributesWithSchema)) -> Self { let span_kind: span::SpanKind = source_span.span_kind.into(); ResourceSpans { resource: Some(Resource { - attributes: resource_attributes(&source_span.resource).0, + attributes: resource.attributes.0.clone(), dropped_attributes_count: 0, }), - schema_url: source_span - .resource - .schema_url() - .map(|url| url.to_string()) - .unwrap_or_default(), + schema_url: resource.schema_url.clone().unwrap_or_default(), scope_spans: vec![ScopeSpans { schema_url: source_span .instrumentation_lib diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index 774920267a..e0d67d0b50 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -5,6 +5,7 @@ use opentelemetry_sdk::export::{self, trace::ExportResult}; use std::io::{stdout, Write}; use crate::trace::transform::SpanData; +use opentelemetry_sdk::resource::Resource; type Encoder = Box TraceResult<()> + Send + Sync>; @@ -12,6 +13,7 @@ type Encoder = Box TraceResult<()> + Send + pub struct SpanExporter { writer: Option>, encoder: Encoder, + resource: Resource, } impl fmt::Debug for SpanExporter { @@ -36,7 +38,11 @@ impl Default for SpanExporter { impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { let res = if let Some(writer) = &mut self.writer { - (self.encoder)(writer, crate::trace::SpanData::from(batch)).and_then(|_| { + (self.encoder)( + writer, + crate::trace::SpanData::from((batch, &self.resource)), + ) + .and_then(|_| { writer .write_all(b"\n") .map_err(|err| TraceError::Other(Box::new(err))) @@ -51,6 +57,10 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { fn shutdown(&mut self) { self.writer.take(); } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } } /// Configuration for the stdout trace exporter @@ -107,6 +117,7 @@ impl SpanExporterBuilder { pub fn build(self) -> SpanExporter { SpanExporter { writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), + resource: Resource::default(), encoder: self.encoder.unwrap_or_else(|| { Box::new(|writer, spans| { serde_json::to_writer(writer, &spans) diff --git a/opentelemetry-stdout/src/trace/transform.rs b/opentelemetry-stdout/src/trace/transform.rs index 66a659de07..7c0903cbc6 100644 --- a/opentelemetry-stdout/src/trace/transform.rs +++ b/opentelemetry-stdout/src/trace/transform.rs @@ -9,17 +9,27 @@ pub struct SpanData { resource_spans: Vec, } -impl From> for SpanData { - fn from(sdk_spans: Vec) -> Self { +impl + From<( + Vec, + &opentelemetry_sdk::Resource, + )> for SpanData +{ + fn from( + (sdk_spans, sdk_resource): ( + Vec, + &opentelemetry_sdk::Resource, + ), + ) -> Self { let mut resource_spans = HashMap::::new(); for sdk_span in sdk_spans { - let resource_schema_url = sdk_span.resource.schema_url().map(|s| s.to_string().into()); + let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_span.instrumentation_lib.schema_url.clone(); let scope = sdk_span.instrumentation_lib.clone().into(); - let resource = sdk_span.resource.as_ref().into(); + let resource: Resource = sdk_resource.into(); let rs = resource_spans - .entry(sdk_span.resource.as_ref().into()) + .entry(sdk_resource.into()) .or_insert_with(move || ResourceSpans { resource, scope_spans: Vec::with_capacity(1), diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index 0bd2d19016..15dfdd3ce6 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -231,6 +231,8 @@ impl trace::SpanExporter for Exporter { self.local_endpoint.clone(), )) } + + fn set_resource(&mut self, _resource: &Resource) {} } /// Wrap type for errors from opentelemetry zipkin diff --git a/opentelemetry-zipkin/src/exporter/model/span.rs b/opentelemetry-zipkin/src/exporter/model/span.rs index 6e21b52588..8c9c7fd5a1 100644 --- a/opentelemetry-zipkin/src/exporter/model/span.rs +++ b/opentelemetry-zipkin/src/exporter/model/span.rs @@ -60,9 +60,8 @@ mod tests { use crate::exporter::model::span::{Kind, Span}; use crate::exporter::model::{into_zipkin_span, OTEL_ERROR_DESCRIPTION, OTEL_STATUS_CODE}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId}; + use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; - use opentelemetry_sdk::{export::trace::SpanData, Resource}; - use std::borrow::Cow; use std::collections::HashMap; use std::net::Ipv4Addr; use std::time::SystemTime; @@ -166,7 +165,6 @@ mod tests { events: SpanEvents::default(), links: SpanLinks::default(), status, - resource: Cow::Owned(Resource::default()), instrumentation_lib: Default::default(), }; let local_endpoint = Endpoint::new("test".into(), None); diff --git a/stress/src/traces.rs b/stress/src/traces.rs index 0dd992f708..c1c8872f50 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -45,6 +45,8 @@ impl SpanProcessor for NoOpSpanProcessor { fn shutdown(&mut self) -> TraceResult<()> { Ok(()) } + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} } fn main() {