diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 07a70bf57d..2067bc7d4a 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -224,7 +224,7 @@ impl HttpExporterBuilder { OTEL_EXPORTER_OTLP_TRACES_HEADERS, )?; - Ok(crate::SpanExporter::new(client)) + Ok(crate::SpanExporter::from_http(client)) } /// Create a log exporter with the current configuration diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index cf3411e2e0..4a25718b68 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use futures_core::future::BoxFuture; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::{otel_debug, trace::TraceError}; use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter}; @@ -8,44 +7,42 @@ use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter}; use super::OtlpHttpClient; impl SpanExporter for OtlpHttpClient { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - let client = match self - .client - .lock() - .map_err(|e| TraceError::Other(e.to_string().into())) - .and_then(|g| match &*g { - Some(client) => Ok(Arc::clone(client)), - _ => Err(TraceError::Other("exporter is already shut down".into())), - }) { - Ok(client) => client, - Err(err) => return Box::pin(std::future::ready(Err(err))), - }; + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + async { + let client = match self + .client + .lock() + .map_err(|e| TraceError::Other(e.to_string().into())) + .and_then(|g| match &*g { + Some(client) => Ok(Arc::clone(client)), + _ => Err(TraceError::Other("exporter is already shut down".into())), + }) { + Ok(client) => client, + Err(err) => return Err(err), + }; - let (body, content_type) = match self.build_trace_export_body(batch) { - Ok(body) => body, - Err(e) => return Box::pin(std::future::ready(Err(e))), - }; + let (body, content_type) = match self.build_trace_export_body(batch) { + Ok(body) => body, + Err(e) => return Err(e), + }; - let mut request = match http::Request::builder() - .method(Method::POST) - .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body) - { - Ok(req) => req, - Err(e) => { - return Box::pin(std::future::ready(Err(crate::Error::RequestFailed( - Box::new(e), - ) - .into()))) - } - }; + let mut request = match http::Request::builder() + .method(Method::POST) + .uri(&self.collector_endpoint) + .header(CONTENT_TYPE, content_type) + .body(body) + { + Ok(req) => req, + Err(e) => return Err(crate::Error::RequestFailed(Box::new(e)).into()), + }; - for (k, v) in &self.headers { - request.headers_mut().insert(k.clone(), v.clone()); - } + for (k, v) in &self.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } - Box::pin(async move { let request_uri = request.uri().to_string(); otel_debug!(name: "HttpTracesClient.CallingExport"); let response = client.send(request).await?; @@ -61,7 +58,7 @@ impl SpanExporter for OtlpHttpClient { } Ok(()) - }) + } } fn shutdown(&mut self) { diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 9e2b54c631..ec46c00b99 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -317,7 +317,7 @@ impl TonicExporterBuilder { let client = TonicTracesClient::new(channel, interceptor, compression); - Ok(crate::SpanExporter::new(client)) + Ok(crate::SpanExporter::from_tonic(client)) } } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index fb72cccf4e..e6c7d14910 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -1,15 +1,14 @@ use core::fmt; -use futures_core::future::BoxFuture; use opentelemetry::{otel_debug, trace::TraceError}; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; +use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter}; +use tokio::sync::Mutex; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; -use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; - use super::BoxInterceptor; pub(crate) struct TonicTracesClient { @@ -21,7 +20,7 @@ pub(crate) struct TonicTracesClient { struct ClientInner { client: TraceServiceClient, - interceptor: BoxInterceptor, + interceptor: Mutex, } impl fmt::Debug for TonicTracesClient { @@ -48,7 +47,7 @@ impl TonicTracesClient { TonicTracesClient { inner: Some(ClientInner { client, - interceptor, + interceptor: Mutex::new(interceptor), }), resource: Default::default(), } @@ -56,29 +55,31 @@ impl TonicTracesClient { } impl SpanExporter for TonicTracesClient { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - let (mut client, metadata, extensions) = match &mut self.inner { - Some(inner) => { - let (m, e, _) = match inner.interceptor.call(Request::new(())) { - Ok(res) => res.into_parts(), - Err(e) => { - return Box::pin(std::future::ready(Err(TraceError::Other(Box::new(e))))) - } - }; - (inner.client.clone(), m, e) - } - None => { - return Box::pin(std::future::ready(Err(TraceError::Other( - "exporter is already shut down".into(), - )))) - } - }; + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + async move { + let (mut client, metadata, extensions) = match &self.inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| TraceError::Other(Box::new(e)))? + .into_parts(); + (inner.client.clone(), m, e) + } + None => { + return Err(TraceError::Other("exporter is already shut down".into())); + } + }; + + let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource); + + otel_debug!(name: "TonicsTracesClient.CallingExport"); - let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource); - - otel_debug!(name: "TonicsTracesClient.CallingExport"); - - Box::pin(async move { client .export(Request::from_parts( metadata, @@ -89,7 +90,7 @@ impl SpanExporter for TonicTracesClient { .map_err(crate::Error::from)?; Ok(()) - }) + } } fn shutdown(&mut self) { diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index b8c013f5d4..79955bde42 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -4,7 +4,6 @@ use std::fmt::Debug; -use futures_core::future::BoxFuture; use opentelemetry_sdk::trace::{ExportResult, SpanData}; #[cfg(feature = "grpc-tonic")] @@ -63,17 +62,17 @@ impl SpanExporterBuilder { #[cfg(feature = "grpc-tonic")] impl SpanExporterBuilder { pub fn build(self) -> Result { - let span_exporter = self.client.0.build_span_exporter()?; + let result = self.client.0.build_span_exporter(); opentelemetry::otel_debug!(name: "SpanExporterBuilt"); - Ok(SpanExporter::new(span_exporter)) + result } } #[cfg(any(feature = "http-proto", feature = "http-json"))] impl SpanExporterBuilder { pub fn build(self) -> Result { - let span_exporter = self.client.0.build_span_exporter()?; - Ok(SpanExporter::new(span_exporter)) + let result = self.client.0.build_span_exporter(); + result } } @@ -107,26 +106,61 @@ impl HasHttpConfig for SpanExporterBuilder { /// OTLP exporter that sends tracing information #[derive(Debug)] -pub struct SpanExporter(Box); +pub struct SpanExporter { + client: SupportedTransportClient, +} + +#[derive(Debug)] +enum SupportedTransportClient { + #[cfg(feature = "grpc-tonic")] + Tonic(crate::exporter::tonic::trace::TonicTracesClient), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + Http(crate::exporter::http::OtlpHttpClient), +} impl SpanExporter { - /// Obtain a builder to configure a [SpanExporter]. + /// Obtain a builder to configure a [LogExporter]. pub fn builder() -> SpanExporterBuilder { SpanExporterBuilder::default() } - /// Build a new span exporter from a client - pub fn new(client: impl opentelemetry_sdk::trace::SpanExporter + 'static) -> Self { - SpanExporter(Box::new(client)) + #[cfg(any(feature = "http-proto", feature = "http-json"))] + pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self { + SpanExporter { + client: SupportedTransportClient::Http(client), + } + } + + #[cfg(feature = "grpc-tonic")] + pub(crate) fn from_tonic(client: crate::exporter::tonic::trace::TonicTracesClient) -> Self { + SpanExporter { + client: SupportedTransportClient::Tonic(client), + } } } impl opentelemetry_sdk::trace::SpanExporter for SpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - self.0.export(batch) + #[allow(clippy::manual_async_fn)] + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + async move { + match &self.client { + #[cfg(feature = "grpc-tonic")] + SupportedTransportClient::Tonic(client) => client.export(batch).await, + #[cfg(any(feature = "http-proto", feature = "http-json"))] + SupportedTransportClient::Http(client) => client.export(batch).await, + } + } } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.0.set_resource(resource); + match &mut self.client { + #[cfg(feature = "grpc-tonic")] + SupportedTransportClient::Tonic(client) => client.set_resource(resource), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + SupportedTransportClient::Http(client) => client.set_resource(resource), + } } } diff --git a/opentelemetry-sdk/benches/context.rs b/opentelemetry-sdk/benches/context.rs index 8a9108639a..6f3d9a3c6f 100644 --- a/opentelemetry-sdk/benches/context.rs +++ b/opentelemetry-sdk/benches/context.rs @@ -137,8 +137,11 @@ fn parent_sampled_tracer(inner_sampler: Sampler) -> (TracerProvider, BoxedTracer struct NoopExporter; impl SpanExporter for NoopExporter { - fn export(&mut self, _spans: Vec) -> BoxFuture<'static, ExportResult> { - Box::pin(futures_util::future::ready(Ok(()))) + fn export( + &self, + _spans: Vec, + ) -> impl std::future::Future + Send { + async { Ok(()) } } } diff --git a/opentelemetry-sdk/benches/span_builder.rs b/opentelemetry-sdk/benches/span_builder.rs index 7f78b738f9..63631f6a35 100644 --- a/opentelemetry-sdk/benches/span_builder.rs +++ b/opentelemetry-sdk/benches/span_builder.rs @@ -65,8 +65,11 @@ fn not_sampled_provider() -> (sdktrace::TracerProvider, sdktrace::Tracer) { struct NoopExporter; impl SpanExporter for NoopExporter { - fn export(&mut self, _spans: Vec) -> BoxFuture<'static, ExportResult> { - Box::pin(futures_util::future::ready(Ok(()))) + fn export( + &self, + _spans: Vec, + ) -> impl std::future::Future + Send { + async { Ok(()) } } } diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 4a90e4def4..6b4792056e 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -2,7 +2,6 @@ use crate::{ trace::{ExportResult, SpanData, SpanExporter}, trace::{SpanEvents, SpanLinks}, }; -use futures_util::future::BoxFuture; pub use opentelemetry::testing::trace::TestSpan; use opentelemetry::{ trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}, @@ -40,17 +39,22 @@ pub struct TokioSpanExporter { } impl SpanExporter for TokioSpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - for span_data in batch { - if let Err(err) = self - .tx_export - .send(span_data) - .map_err::(Into::into) - { - return Box::pin(std::future::ready(Err(Into::into(err)))); + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + async { + for span_data in batch { + if let Err(err) = self + .tx_export + .send(span_data) + .map_err::(Into::into) + { + return Err(Into::into(err)); + } } + Ok(()) } - Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) { @@ -113,7 +117,7 @@ impl NoopSpanExporter { #[async_trait::async_trait] impl SpanExporter for NoopSpanExporter { - fn export(&mut self, _: Vec) -> BoxFuture<'static, ExportResult> { - Box::pin(std::future::ready(Ok(()))) + fn export(&self, _: Vec) -> impl std::future::Future + Send { + async { Ok(()) } } } diff --git a/opentelemetry-sdk/src/trace/export.rs b/opentelemetry-sdk/src/trace/export.rs index c606d85b1a..d7495d5a2c 100644 --- a/opentelemetry-sdk/src/trace/export.rs +++ b/opentelemetry-sdk/src/trace/export.rs @@ -1,6 +1,5 @@ //! Trace exporters use crate::Resource; -use futures_util::future::BoxFuture; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError}; use opentelemetry::{InstrumentationScope, KeyValue}; use std::borrow::Cow; @@ -30,7 +29,10 @@ pub trait SpanExporter: Send + Sync + Debug { /// /// Any retry logic that is required by the exporter is the responsibility /// of the exporter. - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult>; + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send; /// Shuts down the exporter. Called when SDK is shut down. This is an /// opportunity for exporter to do any cleanup required. @@ -60,8 +62,8 @@ pub trait SpanExporter: Send + Sync + Debug { /// implemented as a blocking API or an asynchronous API which notifies the caller via /// a callback or an event. OpenTelemetry client authors can decide if they want to /// make the flush timeout configurable. - fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> { - Box::pin(async { Ok(()) }) + fn force_flush(&mut self) -> impl std::future::Future + Send { + async { Ok(()) } } /// Set the resource for the exporter. diff --git a/opentelemetry-sdk/src/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/trace/in_memory_exporter.rs index 4f85f46444..61af7c5be8 100644 --- a/opentelemetry-sdk/src/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/trace/in_memory_exporter.rs @@ -1,6 +1,5 @@ use crate::resource::Resource; use crate::trace::{ExportResult, SpanData, SpanExporter}; -use futures_util::future::BoxFuture; use opentelemetry::trace::{TraceError, TraceResult}; use std::sync::{Arc, Mutex}; @@ -130,7 +129,10 @@ impl InMemorySpanExporter { } impl SpanExporter for InMemorySpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { if let Err(err) = self .spans .lock() diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index a5457ec4db..696c5cb298 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -78,7 +78,7 @@ mod tests { // Arrange let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter.clone())) .build(); // Act @@ -112,7 +112,7 @@ mod tests { // Arrange let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter.clone())) .build(); // Act @@ -148,7 +148,7 @@ mod tests { // Arrange let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter.clone())) .build(); // Act @@ -184,7 +184,7 @@ mod tests { // Arrange let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter.clone())) .build(); // Act @@ -220,7 +220,7 @@ mod tests { // Arrange let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter.clone())) .build(); // Act @@ -256,7 +256,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() .with_sampler(Sampler::AlwaysOff) - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter.clone())) .build(); let tracer = provider.tracer("test"); @@ -309,7 +309,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let provider = TracerProvider::builder() .with_sampler(TestRecordOnlySampler::default()) - .with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone()))) + .with_span_processor(SimpleSpanProcessor::new(exporter)) .build(); let tracer = provider.tracer("test"); @@ -381,7 +381,7 @@ mod tests { } let exporter = InMemorySpanExporter::default(); - let span_processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + let span_processor = SimpleSpanProcessor::new(exporter.clone()); let tracer_provider = TracerProvider::builder() .with_span_processor(span_processor) .build(); diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 776eb8a68a..fec5f3df41 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -293,7 +293,7 @@ impl Builder { /// Processors are invoked in the order they are added. pub fn with_simple_exporter(self, exporter: T) -> Self { let mut processors = self.processors; - processors.push(Box::new(SimpleSpanProcessor::new(Box::new(exporter)))); + processors.push(Box::new(SimpleSpanProcessor::new(exporter))); Builder { processors, ..self } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 278a27a2e1..2c81ca3169 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -114,20 +114,20 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// - `reqwest-client`: TracerProvider may be created anywhere, but spans must be /// emitted from a tokio runtime thread. #[derive(Debug)] -pub struct SimpleSpanProcessor { - exporter: Mutex>, +pub struct SimpleSpanProcessor { + exporter: Mutex, } -impl SimpleSpanProcessor { +impl SimpleSpanProcessor { /// Create a new [SimpleSpanProcessor] using the provided exporter. - pub fn new(exporter: Box) -> Self { + pub(crate) fn new(exporter: T) -> Self { Self { exporter: Mutex::new(exporter), } } } -impl SpanProcessor for SimpleSpanProcessor { +impl SpanProcessor for SimpleSpanProcessor { fn on_start(&self, _span: &mut Span, _cx: &Context) { // Ignored } @@ -141,7 +141,7 @@ impl SpanProcessor for SimpleSpanProcessor { .exporter .lock() .map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into())) - .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span]))); + .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span]))); if let Err(err) = result { // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug` @@ -830,7 +830,7 @@ mod tests { #[test] fn simple_span_processor_on_end_calls_export() { let exporter = InMemorySpanExporterBuilder::new().build(); - let processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); processor.on_end(span_data.clone()); assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data); @@ -840,7 +840,7 @@ mod tests { #[test] fn simple_span_processor_on_end_skips_export_if_not_sampled() { let exporter = InMemorySpanExporterBuilder::new().build(); - let processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + let processor = SimpleSpanProcessor::new(exporter.clone()); let unsampled = SpanData { span_context: SpanContext::empty_context(), parent_span_id: SpanId::INVALID, @@ -862,7 +862,7 @@ mod tests { #[test] fn simple_span_processor_shutdown_calls_shutdown() { let exporter = InMemorySpanExporterBuilder::new().build(); - let processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); processor.on_end(span_data.clone()); assert!(!exporter.get_finished_spans().unwrap().is_empty()); @@ -991,8 +991,6 @@ mod tests { } use crate::Resource; - use futures_util::future::BoxFuture; - use futures_util::FutureExt; use opentelemetry::{Key, KeyValue, Value}; use std::sync::{atomic::Ordering, Arc, Mutex}; @@ -1013,13 +1011,15 @@ mod tests { } impl SpanExporter for MockSpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { let exported_spans = self.exported_spans.clone(); async move { exported_spans.lock().unwrap().extend(batch); Ok(()) } - .boxed() } fn shutdown(&mut self) {} diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index ebe73a7cec..a54adc4f2c 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -181,15 +181,15 @@ enum BatchMessage { SetResource(Arc), } -struct BatchSpanProcessorInternal { +struct BatchSpanProcessorInternal { spans: Vec, export_tasks: FuturesUnordered>, runtime: R, - exporter: Box, + exporter: E, config: BatchConfig, } -impl BatchSpanProcessorInternal { +impl BatchSpanProcessorInternal { async fn flush(&mut self, res_channel: Option>) { let export_task = self.export(); let task = Box::pin(async move { diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index 9653ae77f9..e769865676 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -1,17 +1,16 @@ use chrono::{DateTime, Utc}; use core::fmt; -use futures_util::future::BoxFuture; use opentelemetry::trace::TraceError; use opentelemetry_sdk::trace::{ExportResult, SpanData}; -use std::sync::atomic; +use std::sync::atomic::{AtomicBool, Ordering}; use opentelemetry_sdk::resource::Resource; /// An OpenTelemetry exporter that writes Spans to stdout on export. pub struct SpanExporter { resource: Resource, - is_shutdown: atomic::AtomicBool, - resource_emitted: bool, + is_shutdown: AtomicBool, + resource_emitted: AtomicBool, } impl fmt::Debug for SpanExporter { @@ -24,43 +23,45 @@ impl Default for SpanExporter { fn default() -> Self { SpanExporter { resource: Resource::builder().build(), - is_shutdown: atomic::AtomicBool::new(false), - resource_emitted: false, + is_shutdown: AtomicBool::new(false), + resource_emitted: AtomicBool::new(false), } } } impl opentelemetry_sdk::trace::SpanExporter for SpanExporter { /// Write Spans to stdout - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - if self.is_shutdown.load(atomic::Ordering::SeqCst) { - Box::pin(std::future::ready(Err(TraceError::from( - "exporter is shut down", - )))) - } else { - println!("Spans"); - if self.resource_emitted { - print_spans(batch); + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + async move { + if self.is_shutdown.load(Ordering::SeqCst) { + return Err(TraceError::from("exporter is shut down")); } else { - self.resource_emitted = true; - println!("Resource"); - if let Some(schema_url) = self.resource.schema_url() { - println!("\tResource SchemaUrl: {:?}", schema_url); + println!("Spans"); + if self.resource_emitted.load(Ordering::SeqCst) { + print_spans(batch); + } else { + self.resource_emitted.store(true, Ordering::SeqCst); + println!("Resource"); + if let Some(schema_url) = self.resource.schema_url() { + println!("\tResource SchemaUrl: {:?}", schema_url); + } + + self.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + + print_spans(batch); } - - self.resource.iter().for_each(|(k, v)| { - println!("\t -> {}={:?}", k, v); - }); - - print_spans(batch); + Ok(()) } - - Box::pin(std::future::ready(Ok(()))) } } fn shutdown(&mut self) { - self.is_shutdown.store(true, atomic::Ordering::SeqCst); + self.is_shutdown.store(true, Ordering::SeqCst); } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index dbfd549076..3a673000e9 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -3,7 +3,6 @@ mod model; mod uploader; use async_trait::async_trait; -use futures_core::future::BoxFuture; use http::Uri; use model::endpoint::Endpoint; use opentelemetry::{global, trace::TraceError, InstrumentationScope, KeyValue}; @@ -228,12 +227,11 @@ async fn zipkin_export( #[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Zipkin collector. - fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { - Box::pin(zipkin_export( - batch, - self.uploader.clone(), - self.local_endpoint.clone(), - )) + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + zipkin_export(batch, self.uploader.clone(), self.local_endpoint.clone()) } }