diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 9f73e9c890..a65d4c2b3d 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -10,7 +10,7 @@ | noop_layer_disabled | 12 ns | | noop_layer_enabled | 25 ns | | ot_layer_disabled | 19 ns | - | ot_layer_enabled | 588 ns | + | ot_layer_enabled | 446 ns | */ use async_trait::async_trait; @@ -33,7 +33,7 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export(&mut self, _: Vec) -> LogResult<()> { + async fn export<'a>(&mut self, _: Vec>) -> LogResult<()> { LogResult::Ok(()) } @@ -54,7 +54,7 @@ impl NoopProcessor { } impl LogProcessor for NoopProcessor { - fn emit(&self, _: LogData) { + fn emit(&self, _: &mut LogData) { // no-op } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 4bad1cc39e..3e59ffdd8e 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -9,7 +9,7 @@ use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { let client = self .client .lock() @@ -19,7 +19,13 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? }; + //TODO: avoid cloning here. + let owned_batch = batch + .into_iter() + .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData + .collect::>(); + + let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 65759f7a4b..8a6637a5b0 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -52,7 +52,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner @@ -65,9 +65,11 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; + // TODO: Avoid cloning here. let resource_logs = { batch .into_iter() + .map(|log_data_cow| (log_data_cow.into_owned())) .map(|log_data| (log_data, &self.resource)) .map(Into::into) .collect() diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 13e407a678..72e3b352ec 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -98,7 +98,10 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { + async fn export<'a>( + &mut self, + batch: Vec>, + ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 47c4a7f179..17e0483a5d 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -11,6 +11,18 @@ logger provider. - Removed dependency on `ordered-float`. +- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726) + Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking + change for LogProcessor developers. If the processor needs to invoke the exporter + asynchronously, it should clone the data to ensure it can be safely processed without + lifetime issues. Any changes made to the log data before cloning in this method will be + reflected in the next log processor in the chain, as well as to the exporter. +- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726) + Update `LogExporter::export() method to accept a batch of log data, which can be either a + reference or owned `LogData`. If the exporter needs to process the log data + asynchronously, it should clone the log data to ensure it can be safely processed without + lifetime issues. + ## v0.23.0 - Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index f70bb3e650..72f205c5e0 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -1,3 +1,8 @@ +//! run with `$ cargo bench --bench log --features=logs -- --exact ` to run specific test for logs +//! So to run test named "full-log-with-attributes/with-context" you would run `$ cargo bench --bench log --features=logs -- --exact full-log-with-attributes/with-context` +//! To run all tests for logs you would run `$ cargo bench --bench log --features=logs` +//! + use std::collections::HashMap; use std::time::SystemTime; @@ -19,7 +24,7 @@ struct VoidExporter; #[async_trait] impl LogExporter for VoidExporter { - async fn export(&mut self, _batch: Vec) -> LogResult<()> { + async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { LogResult::Ok(()) } } diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 5c56e355b4..9588339462 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -8,13 +8,14 @@ use opentelemetry::{ logs::{LogError, LogResult}, InstrumentationLibrary, }; +use std::borrow::Cow; use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export(&mut self, batch: Vec) -> LogResult<()>; + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 3a552e4e57..d20e602ae1 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -256,20 +256,21 @@ impl opentelemetry::logs::Logger for Logger { cx.has_active_span() .then(|| TraceContext::from(cx.span().span_context())) }); + let mut log_record = record; + if let Some(ref trace_context) = trace_context { + log_record.trace_context = Some(trace_context.clone()); + } + if log_record.observed_timestamp.is_none() { + log_record.observed_timestamp = Some(SystemTime::now()); + } + + let mut data = LogData { + record: log_record, + instrumentation: self.instrumentation_library().clone(), + }; for p in processors { - let mut cloned_record = record.clone(); - if let Some(ref trace_context) = trace_context { - cloned_record.trace_context = Some(trace_context.clone()); - } - if cloned_record.observed_timestamp.is_none() { - cloned_record.observed_timestamp = Some(SystemTime::now()); - } - let data = LogData { - record: cloned_record, - instrumentation: self.instrumentation_library().clone(), - }; - p.emit(data); + p.emit(&mut data); } } @@ -326,7 +327,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: LogData) { + fn emit(&self, _data: &mut LogData) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -561,7 +562,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: LogData) { + fn emit(&self, _data: &mut LogData) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 1aad790c64..34d7fbce14 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -14,6 +14,7 @@ use opentelemetry::{ global, logs::{LogError, LogResult}, }; +use std::borrow::Cow; use std::sync::atomic::AtomicBool; use std::{cmp::min, env, sync::Mutex}; use std::{ @@ -45,7 +46,16 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; /// [`Logger`]: crate::logs::Logger pub trait LogProcessor: Send + Sync + Debug { /// Called when a log record is ready to processed and exported. - fn emit(&self, data: LogData); + /// + /// This method receives a mutable reference to `LogData`. If the processor + /// needs to handle the export asynchronously, it should clone the data to + /// ensure it can be safely processed without lifetime issues. Any changes + /// made to the log data in this method will be reflected in the next log + /// processor in the chain. + /// + /// # Parameters + /// - `data`: A mutable reference to `LogData` representing the log record. + fn emit(&self, data: &mut LogData); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -80,7 +90,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: LogData) { + fn emit(&self, data: &mut LogData) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -90,7 +100,9 @@ impl LogProcessor for SimpleLogProcessor { .exporter .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) - .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![data]))); + .and_then(|mut exporter| { + futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)])) + }); if let Err(err) = result { global::handle_error(err); } @@ -140,8 +152,10 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: LogData) { - let result = self.message_sender.try_send(BatchMessage::ExportLog(data)); + fn emit(&self, data: &mut LogData) { + let result = self + .message_sender + .try_send(BatchMessage::ExportLog(data.clone())); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); @@ -201,7 +215,7 @@ impl BatchLogProcessor { match message { // Log has finished, add to buffer of pending logs. BatchMessage::ExportLog(log) => { - logs.push(log); + logs.push(Cow::Owned(log)); if logs.len() == config.max_export_batch_size { let result = export_with_timeout( @@ -285,11 +299,11 @@ impl BatchLogProcessor { } } -async fn export_with_timeout( +async fn export_with_timeout<'a, R, E>( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec, + batch: Vec>, ) -> ExportResult where R: RuntimeChannel, @@ -510,8 +524,14 @@ mod tests { Resource, }; use async_trait::async_trait; + use opentelemetry::logs::AnyValue; + #[cfg(feature = "logs_level_enabled")] + use opentelemetry::logs::Severity; + use opentelemetry::logs::{Logger, LoggerProvider as _}; + use opentelemetry::Key; use opentelemetry::{logs::LogResult, KeyValue}; - use std::sync::Arc; + use std::borrow::Cow; + use std::sync::{Arc, Mutex}; use std::time::Duration; #[derive(Debug, Clone)] @@ -521,7 +541,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export(&mut self, _batch: Vec) -> LogResult<()> { + async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { Ok(()) } @@ -743,17 +763,15 @@ mod tests { BatchConfig::default(), runtime::Tokio, ); - processor.emit(LogData { + let mut log_data = LogData { record: Default::default(), instrumentation: Default::default(), - }); + }; + processor.emit(&mut log_data); processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? - processor.emit(LogData { - record: Default::default(), - instrumentation: Default::default(), - }); + processor.emit(&mut log_data); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } @@ -764,10 +782,12 @@ mod tests { .build(); let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); - processor.emit(LogData { + let mut log_data = LogData { record: Default::default(), instrumentation: Default::default(), - }); + }; + + processor.emit(&mut log_data); processor.shutdown().unwrap(); @@ -776,11 +796,125 @@ mod tests { .load(std::sync::atomic::Ordering::Relaxed); assert!(is_shutdown); - processor.emit(LogData { - record: Default::default(), - instrumentation: Default::default(), - }); + processor.emit(&mut log_data); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } + + #[derive(Debug)] + struct FirstProcessor { + pub(crate) logs: Arc>>, + } + + impl LogProcessor for FirstProcessor { + fn emit(&self, data: &mut LogData) { + // add attribute + data.record.attributes.get_or_insert(vec![]).push(( + Key::from_static_str("processed_by"), + AnyValue::String("FirstProcessor".into()), + )); + // update body + data.record.body = Some("Updated by FirstProcessor".into()); + + self.logs.lock().unwrap().push(data.clone()); //clone as the LogProcessor is storing the data. + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&self) -> LogResult<()> { + Ok(()) + } + } + + #[derive(Debug)] + struct SecondProcessor { + pub(crate) logs: Arc>>, + } + + impl LogProcessor for SecondProcessor { + fn emit(&self, data: &mut LogData) { + assert!(data.record.attributes.as_ref().map_or(false, |attrs| { + attrs.iter().any(|(key, value)| { + key.as_str() == "processed_by" + && value == &AnyValue::String("FirstProcessor".into()) + }) + })); + assert!( + data.record.body.clone().unwrap() + == AnyValue::String("Updated by FirstProcessor".into()) + ); + self.logs.lock().unwrap().push(data.clone()); + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&self) -> LogResult<()> { + Ok(()) + } + } + #[test] + fn test_log_data_modification_by_multiple_processors() { + let first_processor_logs = Arc::new(Mutex::new(Vec::new())); + let second_processor_logs = Arc::new(Mutex::new(Vec::new())); + + let first_processor = FirstProcessor { + logs: Arc::clone(&first_processor_logs), + }; + let second_processor = SecondProcessor { + logs: Arc::clone(&second_processor_logs), + }; + + let logger_provider = LoggerProvider::builder() + .with_log_processor(first_processor) + .with_log_processor(second_processor) + .build(); + + let logger = logger_provider.logger("test-logger"); + let mut log_record = logger.create_log_record(); + log_record.body = Some(AnyValue::String("Test log".into())); + + logger.emit(log_record); + + assert_eq!(first_processor_logs.lock().unwrap().len(), 1); + assert_eq!(second_processor_logs.lock().unwrap().len(), 1); + + let first_log = &first_processor_logs.lock().unwrap()[0]; + let second_log = &second_processor_logs.lock().unwrap()[0]; + + assert!(first_log.record.attributes.iter().any(|attrs| { + attrs.iter().any(|(key, value)| { + key.as_str() == "processed_by" + && value == &AnyValue::String("FirstProcessor".into()) + }) + })); + + assert!(second_log.record.attributes.iter().any(|attrs| { + attrs.iter().any(|(key, value)| { + key.as_str() == "processed_by" + && value == &AnyValue::String("FirstProcessor".into()) + }) + })); + assert!( + first_log.record.body.clone().unwrap() + == AnyValue::String("Updated by FirstProcessor".into()) + ); + assert!( + second_log.record.body.clone().unwrap() + == AnyValue::String("Updated by FirstProcessor".into()) + ); + } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 233da0ccbb..8068fafaec 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -175,12 +175,14 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export(&mut self, batch: Vec) -> LogResult<()> { - self.logs - .lock() - .map(|mut logs_guard| logs_guard.append(&mut batch.clone())) - .map_err(LogError::from) + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + let mut logs_guard = self.logs.lock().map_err(LogError::from)?; + for log in batch.into_iter() { + logs_guard.push(log.into_owned()); + } + Ok(()) } + fn shutdown(&mut self) { if self.should_reset_on_shutdown { self.reset(); diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index fcea153133..dacefa3d8b 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -6,6 +6,7 @@ use opentelemetry::{ }; use opentelemetry_sdk::export::logs::{ExportResult, LogData}; use opentelemetry_sdk::Resource; +use std::borrow::Cow; use std::io::{stdout, Write}; type Encoder = @@ -44,9 +45,13 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: Vec) -> ExportResult { + async fn export<'a>(&mut self, batch: Vec>) -> ExportResult { if let Some(writer) = &mut self.writer { - let log_data = crate::logs::transform::LogData::from((batch, &self.resource)); + // TODO - Avoid cloning logdata if it is borrowed. + let log_data = crate::logs::transform::LogData::from(( + batch.into_iter().map(Cow::into_owned).collect(), + &self.resource, + )); let result = (self.encoder)(writer, log_data) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { diff --git a/stress/src/logs.rs b/stress/src/logs.rs index a70e81014d..a4d4d48ae7 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -3,7 +3,7 @@ OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2) Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, RAM: 64.0 GB - 27 M/sec + 39 M/sec */ use opentelemetry_appender_tracing::layer; @@ -17,7 +17,7 @@ mod throughput; pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: opentelemetry_sdk::export::logs::LogData) {} + fn emit(&self, _data: &mut opentelemetry_sdk::export::logs::LogData) {} fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(())