Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test #15

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open

test #15

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3399573
initial commit
lalitb May 8, 2024
72de6a4
fmt
lalitb May 8, 2024
da9707a
doc
lalitb May 8, 2024
dd14edd
tbd->todo
lalitb May 8, 2024
af1b590
fix inmemory-export
lalitb May 9, 2024
99e1109
Merge branch 'log-processor-optimize' of github.com:lalitb/openteleme…
lalitb May 9, 2024
2599b0c
Merge branch 'main' into log-processor-optimize
lalitb May 9, 2024
a043685
Merge branch 'main' into log-processor-optimize
lalitb May 9, 2024
ef5f630
Merge branch 'main' into log-processor-optimize
lalitb May 21, 2024
fe866a9
add unit test
lalitb May 21, 2024
af35d3b
Merge branch 'main' into log-processor-optimize
lalitb May 21, 2024
252c9e7
add changelog
lalitb May 21, 2024
90b4dd4
Merge branch 'main' into log-processor-optimize
lalitb May 21, 2024
b50fd5f
Merge branch 'main' into log-processor-optimize
lalitb May 23, 2024
6a444d3
exporter takes slice to reference to logdata
lalitb May 23, 2024
4564ed6
fix
lalitb May 23, 2024
e0e3a76
Revert "fix"
lalitb May 23, 2024
bde9131
Revert "exporter takes slice to reference to logdata"
lalitb May 23, 2024
95f83e7
Update CHANGELOG.md
lalitb May 23, 2024
00d925c
Update logs.rs
lalitb May 23, 2024
6173552
Update logs.rs
lalitb May 23, 2024
57cbf2e
Merge branch 'main' into log-processor-optimize
lalitb May 23, 2024
8249652
update test
lalitb May 23, 2024
c50db2e
Merge branch 'log-processor-optimize' of github.com:lalitb/openteleme…
lalitb May 23, 2024
563edcc
remove comment
lalitb May 23, 2024
9d016b1
update stress results
lalitb May 23, 2024
af99199
update benchmark
lalitb May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
| noop_layer_disabled | 12 ns |
| noop_layer_enabled | 25 ns |
| ot_layer_disabled | 19 ns |
| ot_layer_enabled | 588 ns |
| ot_layer_enabled | 446 ns |
*/

use async_trait::async_trait;
Expand All @@ -33,7 +33,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&mut self, _: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
LogResult::Ok(())
}

Expand All @@ -54,7 +54,7 @@ impl NoopProcessor {
}

impl LogProcessor for NoopProcessor {
fn emit(&self, _: LogData) {
fn emit(&self, _: &mut LogData) {
// no-op
}

Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
let client = self
.client
.lock()
Expand All @@ -19,7 +19,13 @@ impl LogExporter for OtlpHttpClient {
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl TonicLogsClient {

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
let (m, e, _) = inner
Expand All @@ -65,9 +65,11 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

// TODO: Avoid cloning here.
let resource_logs = {
batch
.into_iter()
.map(|log_data_cow| (log_data_cow.into_owned()))
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl LogExporter {

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
async fn export<'a>(
&mut self,
batch: Vec<std::borrow::Cow<'a, LogData>>,
) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}

Expand Down
12 changes: 12 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
logger provider.
- Removed dependency on `ordered-float`.

- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking
change for LogProcessor developers. If the processor needs to invoke the exporter
asynchronously, it should clone the data to ensure it can be safely processed without
lifetime issues. Any changes made to the log data before cloning in this method will be
reflected in the next log processor in the chain, as well as to the exporter.
- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
Update `LogExporter::export() method to accept a batch of log data, which can be either a
reference or owned `LogData`. If the exporter needs to process the log data
asynchronously, it should clone the log data to ensure it can be safely processed without
lifetime issues.

## v0.23.0

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! run with `$ cargo bench --bench log --features=logs -- --exact <test_name>` to run specific test for logs
//! So to run test named "full-log-with-attributes/with-context" you would run `$ cargo bench --bench log --features=logs -- --exact full-log-with-attributes/with-context`
//! To run all tests for logs you would run `$ cargo bench --bench log --features=logs`
//!

use std::collections::HashMap;
use std::time::SystemTime;

Expand All @@ -19,7 +24,7 @@ struct VoidExporter;

#[async_trait]
impl LogExporter for VoidExporter {
async fn export(&mut self, _batch: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, _batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
LogResult::Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use opentelemetry::{
logs::{LogError, LogResult},
InstrumentationLibrary,
};
use std::borrow::Cow;
use std::fmt::Debug;

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogData`].
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
#[cfg(feature = "logs_level_enabled")]
Expand Down
29 changes: 15 additions & 14 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,21 @@ impl opentelemetry::logs::Logger for Logger {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
});
let mut log_record = record;
if let Some(ref trace_context) = trace_context {
log_record.trace_context = Some(trace_context.clone());
}
if log_record.observed_timestamp.is_none() {
log_record.observed_timestamp = Some(SystemTime::now());
}

let mut data = LogData {
record: log_record,
instrumentation: self.instrumentation_library().clone(),
};

for p in processors {
let mut cloned_record = record.clone();
if let Some(ref trace_context) = trace_context {
cloned_record.trace_context = Some(trace_context.clone());
}
if cloned_record.observed_timestamp.is_none() {
cloned_record.observed_timestamp = Some(SystemTime::now());
}
let data = LogData {
record: cloned_record,
instrumentation: self.instrumentation_library().clone(),
};
p.emit(data);
p.emit(&mut data);
}
}

Expand Down Expand Up @@ -326,7 +327,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: LogData) {
fn emit(&self, _data: &mut LogData) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -561,7 +562,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: LogData) {
fn emit(&self, _data: &mut LogData) {
// nothing to do.
}

Expand Down
Loading
Loading