Skip to content

Commit

Permalink
fix(swordfish): Track future poll times for explain analyze (#3511)
Browse files Browse the repository at this point in the history
The explain analyzes and tracing spans don't properly track the
execution tasks. All the cpu times were showing up as 0.0, and the spans
weren't showing up in traces.

This is because the `execute`, `sink`, and `finalize` methods now return
futures, of which the timings are currently not tracked. This PR changes
it so that the time of the futures are now tracked.

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
  • Loading branch information
colin-ho and Colin Ho authored Jan 8, 2025
1 parent eeacc47 commit e6c084f
Show file tree
Hide file tree
Showing 25 changed files with 504 additions and 379 deletions.
28 changes: 16 additions & 12 deletions src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use daft_micropartition::python::PyMicroPartition;
use daft_micropartition::MicroPartition;
#[cfg(feature = "python")]
use pyo3::prelude::*;
use tracing::instrument;
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -162,17 +162,21 @@ impl IntermediateOperator for ActorPoolProjectOperator {
task_spawner: &ExecutionTaskSpawner,
) -> IntermediateOpExecuteResult {
let memory_request = self.memory_request;
let fut = task_spawner.spawn_with_memory_request(memory_request, async move {
let actor_pool_project_state = state
.as_any_mut()
.downcast_mut::<ActorPoolProjectState>()
.expect("ActorPoolProjectState");
let res = actor_pool_project_state
.actor_handle
.eval_input(input)
.map(|result| IntermediateOperatorResult::NeedMoreInput(Some(result)))?;
Ok((state, res))
});
let fut = task_spawner.spawn_with_memory_request(
memory_request,
async move {
let actor_pool_project_state = state
.as_any_mut()
.downcast_mut::<ActorPoolProjectState>()
.expect("ActorPoolProjectState");
let res = actor_pool_project_state
.actor_handle
.eval_input(input)
.map(|result| IntermediateOperatorResult::NeedMoreInput(Some(result)))?;
Ok((state, res))
},
Span::current(),
);
fut.into()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use daft_dsl::ExprRef;
use daft_logical_plan::JoinType;
use daft_micropartition::MicroPartition;
use daft_table::{GrowableTable, ProbeState, Probeable};
use tracing::{info_span, instrument};
use tracing::{info_span, instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -130,16 +130,19 @@ impl IntermediateOperator for AntiSemiProbeOperator {

let params = self.params.clone();
task_spawner
.spawn(async move {
let probe_state = state
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeState should be used with AntiSemiProbeOperator");
let probeable = probe_state.get_or_await_probeable().await;
let res =
Self::probe_anti_semi(&params.probe_on, &probeable, &input, params.is_semi);
Ok((state, IntermediateOperatorResult::NeedMoreInput(Some(res?))))
})
.spawn(
async move {
let probe_state = state
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeState should be used with AntiSemiProbeOperator");
let probeable = probe_state.get_or_await_probeable().await;
let res =
Self::probe_anti_semi(&params.probe_on, &probeable, &input, params.is_semi);
Ok((state, IntermediateOperatorResult::NeedMoreInput(Some(res?))))
},
Span::current(),
)
.into()
}

Expand Down
6 changes: 3 additions & 3 deletions src/daft-local-execution/src/intermediate_ops/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use common_error::DaftResult;
use daft_core::{join::JoinSide, prelude::SchemaRef};
use daft_micropartition::MicroPartition;
use daft_table::Table;
use tracing::{info_span, instrument, Instrument};
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -132,8 +132,8 @@ impl IntermediateOperator for CrossJoinOperator {
IntermediateOperatorResult::HasMoreOutput(output_morsel)
};
Ok((state, result))
}
.instrument(info_span!("CrossJoinOperator::execute")),
},
Span::current(),
)
.into()
}
Expand Down
19 changes: 11 additions & 8 deletions src/daft-local-execution/src/intermediate_ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use daft_dsl::ExprRef;
use daft_functions::list::explode;
use daft_micropartition::MicroPartition;
use tracing::instrument;
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -33,13 +33,16 @@ impl IntermediateOperator for ExplodeOperator {
) -> IntermediateOpExecuteResult {
let to_explode = self.to_explode.clone();
task_spawner
.spawn(async move {
let out = input.explode(&to_explode)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
})
.spawn(
async move {
let out = input.explode(&to_explode)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
},
Span::current(),
)
.into()
}

Expand Down
19 changes: 11 additions & 8 deletions src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use tracing::instrument;
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -30,13 +30,16 @@ impl IntermediateOperator for FilterOperator {
) -> IntermediateOpExecuteResult {
let predicate = self.predicate.clone();
task_spawner
.spawn(async move {
let out = input.filter(&[predicate])?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
})
.spawn(
async move {
let out = input.filter(&[predicate])?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
},
Span::current(),
)
.into()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_table::{GrowableTable, ProbeState};
use indexmap::IndexSet;
use tracing::{info_span, instrument};
use tracing::{info_span, instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -184,25 +184,28 @@ impl IntermediateOperator for InnerHashJoinProbeOperator {

let params = self.params.clone();
task_spawner
.spawn(async move {
let inner_join_state = state
.spawn(
async move {
let inner_join_state = state
.as_any_mut()
.downcast_mut::<InnerHashJoinProbeState>()
.expect(
"InnerHashJoinProbeState should be used with InnerHashJoinProbeOperator",
);
let probe_state = inner_join_state.get_or_await_probe_state().await;
let res = Self::probe_inner(
&input,
&probe_state,
&params.probe_on,
&params.common_join_keys,
&params.left_non_join_columns,
&params.right_non_join_columns,
params.build_on_left,
);
Ok((state, IntermediateOperatorResult::NeedMoreInput(Some(res?))))
})
let probe_state = inner_join_state.get_or_await_probe_state().await;
let res = Self::probe_inner(
&input,
&probe_state,
&params.probe_on,
&params.common_join_keys,
&params.left_non_join_columns,
&params.right_non_join_columns,
params.build_on_left,
);
Ok((state, IntermediateOperatorResult::NeedMoreInput(Some(res?))))
},
Span::current(),
)
.into()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,12 @@ impl IntermediateNode {
) -> DaftResult<()> {
let span = info_span!("IntermediateOp::execute");
let compute_runtime = get_compute_runtime();
let task_spawner = ExecutionTaskSpawner::new(compute_runtime, memory_manager);
let task_spawner =
ExecutionTaskSpawner::new(compute_runtime, memory_manager, rt_context, span);
let mut state = op.make_state()?;
while let Some(morsel) = receiver.recv().await {
loop {
let result = rt_context
.in_span(&span, || op.execute(morsel.clone(), state, &task_spawner))
.await??;
let result = op.execute(morsel.clone(), state, &task_spawner).await??;
state = result.0;
match result.1 {
IntermediateOperatorResult::NeedMoreInput(Some(mp)) => {
Expand Down
20 changes: 12 additions & 8 deletions src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use common_error::{DaftError, DaftResult};
use daft_dsl::{functions::python::get_resource_request, ExprRef};
use daft_micropartition::MicroPartition;
use tracing::instrument;
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -40,13 +40,17 @@ impl IntermediateOperator for ProjectOperator {
let projection = self.projection.clone();
let memory_request = self.memory_request;
task_spawner
.spawn_with_memory_request(memory_request, async move {
let out = input.eval_expression_list(&projection)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
})
.spawn_with_memory_request(
memory_request,
async move {
let out = input.eval_expression_list(&projection)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
},
Span::current(),
)
.into()
}

Expand Down
27 changes: 15 additions & 12 deletions src/daft-local-execution/src/intermediate_ops/sample.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use daft_micropartition::MicroPartition;
use tracing::instrument;
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -41,17 +41,20 @@ impl IntermediateOperator for SampleOperator {
) -> IntermediateOpExecuteResult {
let params = self.params.clone();
task_spawner
.spawn(async move {
let out = input.sample_by_fraction(
params.fraction,
params.with_replacement,
params.seed,
)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
})
.spawn(
async move {
let out = input.sample_by_fraction(
params.fraction,
params.with_replacement,
params.seed,
)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
},
Span::current(),
)
.into()
}

Expand Down
29 changes: 16 additions & 13 deletions src/daft-local-execution/src/intermediate_ops/unpivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use tracing::instrument;
use tracing::{instrument, Span};

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
Expand Down Expand Up @@ -48,18 +48,21 @@ impl IntermediateOperator for UnpivotOperator {
) -> IntermediateOpExecuteResult {
let params = self.params.clone();
task_spawner
.spawn(async move {
let out = input.unpivot(
&params.ids,
&params.values,
&params.variable_name,
&params.value_name,
)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
})
.spawn(
async move {
let out = input.unpivot(
&params.ids,
&params.values,
&params.variable_name,
&params.value_name,
)?;
Ok((
state,
IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))),
))
},
Span::current(),
)
.into()
}

Expand Down
Loading

0 comments on commit e6c084f

Please sign in to comment.