Skip to content

Commit

Permalink
Add cancellation checks to potentially heavy loops in query planning
Browse files Browse the repository at this point in the history
  • Loading branch information
sachindshinde committed Feb 28, 2025
1 parent d740eea commit e92bc68
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 1 deletion.
16 changes: 15 additions & 1 deletion apollo-federation/src/query_graph/graph_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2978,6 +2978,7 @@ impl OpGraphPath {
context: &OpGraphPathContext,
condition_resolver: &mut impl ConditionResolver,
override_conditions: &EnabledOverrideConditions,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<(Option<Vec<SimultaneousPaths>>, Option<bool>), FederationError> {
let span = debug_span!(
"Trying to advance directly",
Expand Down Expand Up @@ -3248,6 +3249,7 @@ impl OpGraphPath {
&implementation_inline_fragment.into(),
condition_resolver,
override_conditions,
check_cancellation,
)?;
// If we find no options for that implementation, we bail (as we need to
// simultaneously advance all implementations).
Expand Down Expand Up @@ -3281,6 +3283,7 @@ impl OpGraphPath {
operation_element,
condition_resolver,
override_conditions,
check_cancellation,
)?;
let Some(field_options_for_implementation) =
field_options_for_implementation
Expand Down Expand Up @@ -3317,6 +3320,7 @@ impl OpGraphPath {
}
let all_options = SimultaneousPaths::flat_cartesian_product(
options_for_each_implementation,
check_cancellation,
)?;
if let Some(interface_path) = interface_path {
let (interface_path, all_options) =
Expand Down Expand Up @@ -3457,6 +3461,7 @@ impl OpGraphPath {
&implementation_inline_fragment.into(),
condition_resolver,
override_conditions,
check_cancellation,
)?;
let Some(implementation_options) = implementation_options else {
drop(guard);
Expand All @@ -3479,6 +3484,7 @@ impl OpGraphPath {
}
let all_options = SimultaneousPaths::flat_cartesian_product(
options_for_each_implementation,
check_cancellation,
)?;
debug!("Type-exploded options: {}", DisplaySlice(&all_options));
Ok((Some(all_options), None))
Expand Down Expand Up @@ -3745,6 +3751,7 @@ impl SimultaneousPaths {
/// the options for the `SimultaneousPaths` as a whole.
fn flat_cartesian_product(
options_for_each_path: Vec<Vec<SimultaneousPaths>>,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<Vec<SimultaneousPaths>, FederationError> {
// This can be written more tersely with a bunch of `reduce()`/`flat_map()`s and friends,
// but when interfaces type-explode into many implementations, this can end up with fairly
Expand Down Expand Up @@ -3775,6 +3782,7 @@ impl SimultaneousPaths {

// Compute the cartesian product.
for _ in 0..num_options {
check_cancellation()?;
let num_simultaneous_paths = options_for_each_path
.iter()
.zip(&option_indexes)
Expand Down Expand Up @@ -3940,6 +3948,7 @@ impl SimultaneousPathsWithLazyIndirectPaths {
operation_element: &OpPathElement,
condition_resolver: &mut impl ConditionResolver,
override_conditions: &EnabledOverrideConditions,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<Option<Vec<SimultaneousPathsWithLazyIndirectPaths>>, FederationError> {
debug!(
"Trying to advance paths for operation: path = {}, operation = {operation_element}",
Expand All @@ -3954,6 +3963,7 @@ impl SimultaneousPathsWithLazyIndirectPaths {
// references to `self`, which means cloning these paths when iterating.
let paths = self.paths.0.clone();
for (path_index, path) in paths.iter().enumerate() {
check_cancellation()?;
debug!("Computing options for {path}");
let span = debug_span!(" |");
let gaurd = span.enter();
Expand All @@ -3971,6 +3981,7 @@ impl SimultaneousPathsWithLazyIndirectPaths {
&updated_context,
condition_resolver,
override_conditions,
check_cancellation,
)?;
debug!("{advance_options:?}");
drop(gaurd);
Expand Down Expand Up @@ -4043,6 +4054,7 @@ impl SimultaneousPathsWithLazyIndirectPaths {
&updated_context,
condition_resolver,
override_conditions,
check_cancellation,
)?;
// If we can't advance the operation element after that path, ignore it,
// it's just not an option.
Expand Down Expand Up @@ -4131,6 +4143,7 @@ impl SimultaneousPathsWithLazyIndirectPaths {
&updated_context,
condition_resolver,
override_conditions,
check_cancellation,
)?;
options = advance_options.unwrap_or_else(Vec::new);
debug!("{options:?}");
Expand All @@ -4147,7 +4160,8 @@ impl SimultaneousPathsWithLazyIndirectPaths {
}
}

let all_options = SimultaneousPaths::flat_cartesian_product(options_for_each_path)?;
let all_options =
SimultaneousPaths::flat_cartesian_product(options_for_each_path, check_cancellation)?;
debug!("{all_options:?}");
Ok(Some(self.create_lazy_options(all_options, updated_context)))
}
Expand Down
10 changes: 10 additions & 0 deletions apollo-federation/src/query_plan/fetch_dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3493,6 +3493,7 @@ pub(crate) fn compute_nodes_for_tree(
initial_node_path: FetchDependencyGraphNodePath,
initial_defer_context: DeferContext,
initial_conditions: &OpGraphPathContext,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<IndexSet<NodeIndex>, FederationError> {
snapshot!("OpPathTree", initial_tree.to_string(), "path_tree");
let mut stack = vec![ComputeNodesStackItem {
Expand All @@ -3505,6 +3506,7 @@ pub(crate) fn compute_nodes_for_tree(
}];
let mut created_nodes = IndexSet::default();
while let Some(stack_item) = stack.pop() {
check_cancellation()?;
let node =
FetchDependencyGraph::node_weight_mut(&mut dependency_graph.graph, stack_item.node_id)?;
for selection_set in &stack_item.tree.local_selection_sets {
Expand Down Expand Up @@ -3550,6 +3552,7 @@ pub(crate) fn compute_nodes_for_tree(
edge_id,
new_context,
&mut created_nodes,
check_cancellation,
)?);
}
QueryGraphEdgeTransition::RootTypeResolution { root_kind } => {
Expand Down Expand Up @@ -3577,6 +3580,7 @@ pub(crate) fn compute_nodes_for_tree(
child,
operation,
&mut created_nodes,
check_cancellation,
)?);
}
}
Expand All @@ -3601,6 +3605,7 @@ fn compute_nodes_for_key_resolution<'a>(
edge_id: EdgeIndex,
new_context: &'a OpGraphPathContext,
created_nodes: &mut IndexSet<NodeIndex>,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<ComputeNodesStackItem<'a>, FederationError> {
let edge = stack_item.tree.graph.edge_weight(edge_id)?;
let Some(conditions) = &child.conditions else {
Expand All @@ -3616,6 +3621,7 @@ fn compute_nodes_for_key_resolution<'a>(
stack_item.node_path.clone(),
stack_item.defer_context.for_conditions(),
&Default::default(),
check_cancellation,
)?;
created_nodes.extend(conditions_nodes.iter().copied());
// Then we can "take the edge", creating a new node.
Expand Down Expand Up @@ -3854,6 +3860,7 @@ fn compute_nodes_for_op_path_element<'a>(
child: &'a Arc<PathTreeChild<OpGraphPathTrigger, Option<EdgeIndex>>>,
operation_element: &OpPathElement,
created_nodes: &mut IndexSet<NodeIndex>,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<ComputeNodesStackItem<'a>, FederationError> {
let Some(edge_id) = child.edge else {
// A null edge means that the operation does nothing
Expand Down Expand Up @@ -3960,6 +3967,7 @@ fn compute_nodes_for_op_path_element<'a>(
},
&updated.defer_context,
created_nodes,
check_cancellation,
)?;

if let Some(matching_context_ids) = &child.matching_context_ids {
Expand Down Expand Up @@ -4437,6 +4445,7 @@ fn handle_conditions_tree(
query_graph_edge_id_if_typename_needed: Option<EdgeIndex>,
defer_context: &DeferContext,
created_nodes: &mut IndexSet<NodeIndex>,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<ConditionsNodeData, FederationError> {
// In many cases, we can optimize conditions by merging the fields into previously existing
// nodes. However, we only do this when the current node has only a single parent (it's hard to
Expand Down Expand Up @@ -4520,6 +4529,7 @@ fn handle_conditions_tree(
fetch_node_path.clone(),
defer_context.for_conditions(),
&OpGraphPathContext::default(),
check_cancellation,
)?;

if newly_created_node_ids.is_empty() {
Expand Down
3 changes: 3 additions & 0 deletions apollo-federation/src/query_plan/query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ fn compute_root_serial_dependency_graph(
&mut fetch_dependency_graph,
&prev_path,
parameters.config.type_conditioned_fetching,
&|| parameters.check_cancellation(),
)?;
} else {
// PORT_NOTE: It is unclear if they correct thing to do here is get the next ID, use
Expand Down Expand Up @@ -611,6 +612,7 @@ pub(crate) fn compute_root_fetch_groups(
dependency_graph: &mut FetchDependencyGraph,
path: &OpPathTree,
type_conditioned_fetching_enabled: bool,
check_cancellation: &impl Fn() -> Result<(), SingleFederationError>,
) -> Result<(), FederationError> {
// The root of the pathTree is one of the "fake" root of the subgraphs graph,
// which belongs to no subgraph but points to each ones.
Expand Down Expand Up @@ -660,6 +662,7 @@ pub(crate) fn compute_root_fetch_groups(
)?,
Default::default(),
&Default::default(),
check_cancellation,
)?;
}
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions apollo-federation/src/query_plan/query_planning_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,13 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> {
);

for option in options.iter_mut() {
self.parameters.check_cancellation()?;
let followups_for_option = option.advance_with_operation_element(
self.parameters.supergraph_schema.clone(),
&operation_element,
/*resolver*/ self,
&self.parameters.override_conditions,
&|| self.parameters.check_cancellation(),
)?;
let Some(followups_for_option) = followups_for_option else {
// There is no valid way to advance the current operation element from this option
Expand Down Expand Up @@ -1058,6 +1060,7 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> {
dependency_graph,
path_tree,
type_conditioned_fetching_enabled,
&|| self.parameters.check_cancellation(),
)?;
} else {
let query_graph_node = path_tree.graph.node_weight(path_tree.node)?;
Expand Down Expand Up @@ -1095,6 +1098,7 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> {
)?,
Default::default(),
&Default::default(),
&|| self.parameters.check_cancellation(),
)?;
}

Expand Down

0 comments on commit e92bc68

Please sign in to comment.