Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Span exporter native future #46

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl HttpExporterBuilder {
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
)?;

Ok(crate::SpanExporter::new(client))
Ok(crate::SpanExporter::from_http(client))
}

/// Create a log exporter with the current configuration
Expand Down
69 changes: 33 additions & 36 deletions opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,48 @@
use std::sync::Arc;

use futures_core::future::BoxFuture;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::{otel_debug, trace::TraceError};
use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter};

use super::OtlpHttpClient;

impl SpanExporter for OtlpHttpClient {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let client = match self
.client
.lock()
.map_err(|e| TraceError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(TraceError::Other("exporter is already shut down".into())),
}) {
Ok(client) => client,
Err(err) => return Box::pin(std::future::ready(Err(err))),
};
fn export(
&self,
batch: Vec<SpanData>,
) -> impl std::future::Future<Output = ExportResult> + Send {
async {
let client = match self
.client
.lock()
.map_err(|e| TraceError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(TraceError::Other("exporter is already shut down".into())),
}) {
Ok(client) => client,
Err(err) => return Err(err),
};

let (body, content_type) = match self.build_trace_export_body(batch) {
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
let (body, content_type) = match self.build_trace_export_body(batch) {
Ok(body) => body,
Err(e) => return Err(e),
};

let mut request = match http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
{
Ok(req) => req,
Err(e) => {
return Box::pin(std::future::ready(Err(crate::Error::RequestFailed(
Box::new(e),
)
.into())))
}
};
let mut request = match http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
{
Ok(req) => req,
Err(e) => return Err(crate::Error::RequestFailed(Box::new(e)).into()),
};

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}
for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

Box::pin(async move {
let request_uri = request.uri().to_string();
otel_debug!(name: "HttpTracesClient.CallingExport");
let response = client.send(request).await?;
Expand All @@ -61,7 +58,7 @@ impl SpanExporter for OtlpHttpClient {
}

Ok(())
})
}
}

fn shutdown(&mut self) {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl TonicExporterBuilder {

let client = TonicTracesClient::new(channel, interceptor, compression);

Ok(crate::SpanExporter::new(client))
Ok(crate::SpanExporter::from_tonic(client))
}
}

Expand Down
57 changes: 29 additions & 28 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use core::fmt;

use futures_core::future::BoxFuture;
use opentelemetry::{otel_debug, trace::TraceError};
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter};
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
Expand All @@ -21,7 +20,7 @@ pub(crate) struct TonicTracesClient {

struct ClientInner {
client: TraceServiceClient<Channel>,
interceptor: BoxInterceptor,
interceptor: Mutex<BoxInterceptor>,
}

impl fmt::Debug for TonicTracesClient {
Expand All @@ -48,37 +47,39 @@ impl TonicTracesClient {
TonicTracesClient {
inner: Some(ClientInner {
client,
interceptor,
interceptor: Mutex::new(interceptor),
}),
resource: Default::default(),
}
}
}

impl SpanExporter for TonicTracesClient {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
let (m, e, _) = match inner.interceptor.call(Request::new(())) {
Ok(res) => res.into_parts(),
Err(e) => {
return Box::pin(std::future::ready(Err(TraceError::Other(Box::new(e)))))
}
};
(inner.client.clone(), m, e)
}
None => {
return Box::pin(std::future::ready(Err(TraceError::Other(
"exporter is already shut down".into(),
))))
}
};
fn export(
&self,
batch: Vec<SpanData>,
) -> impl std::future::Future<Output = ExportResult> + Send {
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| TraceError::Other(Box::new(e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => {
return Err(TraceError::Other("exporter is already shut down".into()));
}
};

let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);

otel_debug!(name: "TonicsTracesClient.CallingExport");

let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);

otel_debug!(name: "TonicsTracesClient.CallingExport");

Box::pin(async move {
client
.export(Request::from_parts(
metadata,
Expand All @@ -89,7 +90,7 @@ impl SpanExporter for TonicTracesClient {
.map_err(crate::Error::from)?;

Ok(())
})
}
}

fn shutdown(&mut self) {
Expand Down
60 changes: 47 additions & 13 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use std::fmt::Debug;

use futures_core::future::BoxFuture;
use opentelemetry_sdk::trace::{ExportResult, SpanData};

#[cfg(feature = "grpc-tonic")]
Expand Down Expand Up @@ -63,17 +62,17 @@ impl SpanExporterBuilder<NoExporterBuilderSet> {
#[cfg(feature = "grpc-tonic")]
impl SpanExporterBuilder<TonicExporterBuilderSet> {
pub fn build(self) -> Result<SpanExporter, opentelemetry::trace::TraceError> {
let span_exporter = self.client.0.build_span_exporter()?;
let result = self.client.0.build_span_exporter();
opentelemetry::otel_debug!(name: "SpanExporterBuilt");
Ok(SpanExporter::new(span_exporter))
result
}
}

#[cfg(any(feature = "http-proto", feature = "http-json"))]
impl SpanExporterBuilder<HttpExporterBuilderSet> {
pub fn build(self) -> Result<SpanExporter, opentelemetry::trace::TraceError> {
let span_exporter = self.client.0.build_span_exporter()?;
Ok(SpanExporter::new(span_exporter))
let result = self.client.0.build_span_exporter();
result
}
}

Expand Down Expand Up @@ -107,26 +106,61 @@ impl HasHttpConfig for SpanExporterBuilder<HttpExporterBuilderSet> {

/// OTLP exporter that sends tracing information
#[derive(Debug)]
pub struct SpanExporter(Box<dyn opentelemetry_sdk::trace::SpanExporter>);
pub struct SpanExporter {
client: SupportedTransportClient,
}

#[derive(Debug)]
enum SupportedTransportClient {
#[cfg(feature = "grpc-tonic")]
Tonic(crate::exporter::tonic::trace::TonicTracesClient),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
Http(crate::exporter::http::OtlpHttpClient),
}

impl SpanExporter {
/// Obtain a builder to configure a [SpanExporter].
/// Obtain a builder to configure a [LogExporter].
pub fn builder() -> SpanExporterBuilder<NoExporterBuilderSet> {
SpanExporterBuilder::default()
}

/// Build a new span exporter from a client
pub fn new(client: impl opentelemetry_sdk::trace::SpanExporter + 'static) -> Self {
SpanExporter(Box::new(client))
#[cfg(any(feature = "http-proto", feature = "http-json"))]
pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self {
SpanExporter {
client: SupportedTransportClient::Http(client),
}
}

#[cfg(feature = "grpc-tonic")]
pub(crate) fn from_tonic(client: crate::exporter::tonic::trace::TonicTracesClient) -> Self {
SpanExporter {
client: SupportedTransportClient::Tonic(client),
}
}
}

impl opentelemetry_sdk::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
self.0.export(batch)
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: Vec<SpanData>,
) -> impl std::future::Future<Output = ExportResult> + Send {
async move {
match &self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.export(batch).await,
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.export(batch).await,
}
}
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
match &mut self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.set_resource(resource),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.set_resource(resource),
}
}
}
7 changes: 5 additions & 2 deletions opentelemetry-sdk/benches/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ fn parent_sampled_tracer(inner_sampler: Sampler) -> (TracerProvider, BoxedTracer
struct NoopExporter;

impl SpanExporter for NoopExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
fn export(
&self,
_spans: Vec<SpanData>,
) -> impl std::future::Future<Output = ExportResult> + Send {
async { Ok(()) }
}
}

Expand Down
7 changes: 5 additions & 2 deletions opentelemetry-sdk/benches/span_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ fn not_sampled_provider() -> (sdktrace::TracerProvider, sdktrace::Tracer) {
struct NoopExporter;

impl SpanExporter for NoopExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
fn export(
&self,
_spans: Vec<SpanData>,
) -> impl std::future::Future<Output = ExportResult> + Send {
async { Ok(()) }
}
}

Expand Down
28 changes: 16 additions & 12 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
trace::{SpanEvents, SpanLinks},
};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::{
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
Expand Down Expand Up @@ -40,17 +39,22 @@ pub struct TokioSpanExporter {
}

impl SpanExporter for TokioSpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Box::pin(std::future::ready(Err(Into::into(err))));
fn export(
&self,
batch: Vec<SpanData>,
) -> impl std::future::Future<Output = ExportResult> + Send {
async {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Err(Into::into(err));
}
}
Ok(())
}
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
Expand Down Expand Up @@ -113,7 +117,7 @@ impl NoopSpanExporter {

#[async_trait::async_trait]
impl SpanExporter for NoopSpanExporter {
fn export(&mut self, _: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(std::future::ready(Ok(())))
fn export(&self, _: Vec<SpanData>) -> impl std::future::Future<Output = ExportResult> + Send {
async { Ok(()) }
}
}
Loading
Loading