From 2090fc8bac505c3b857c6f58656f822455ee179f Mon Sep 17 00:00:00 2001 From: Simon Sapin Date: Thu, 20 Feb 2025 20:31:48 +0100 Subject: [PATCH 1/4] Initial scaffolding for cooperative planner cancellation --- apollo-federation/src/error/mod.rs | 3 ++ .../src/query_plan/query_planner.rs | 24 +++++++++++- .../query_plan/query_planning_traversal.rs | 17 +++++++++ .../build_query_plan_tests/overrides.rs | 9 +++-- .../overrides/shareable.rs | 6 ++- apollo-router/src/compute_job.rs | 38 ++++++++++++++++--- apollo-router/src/introspection.rs | 2 +- .../query_planner/query_planner_service.rs | 4 +- .../src/services/layers/query_analysis.rs | 32 ++++++++-------- 9 files changed, 105 insertions(+), 30 deletions(-) diff --git a/apollo-federation/src/error/mod.rs b/apollo-federation/src/error/mod.rs index 59c72d5389a..da52f45daab 100644 --- a/apollo-federation/src/error/mod.rs +++ b/apollo-federation/src/error/mod.rs @@ -300,6 +300,8 @@ pub enum SingleFederationError { DeferredSubscriptionUnsupported, #[error("{message}")] QueryPlanComplexityExceeded { message: String }, + #[error("the caller requested cancellation")] + PlanningCancelled, } impl SingleFederationError { @@ -494,6 +496,7 @@ impl SingleFederationError { SingleFederationError::QueryPlanComplexityExceeded { .. } => { ErrorCode::QueryPlanComplexityExceededError } + SingleFederationError::PlanningCancelled => ErrorCode::Internal, } } } diff --git a/apollo-federation/src/query_plan/query_planner.rs b/apollo-federation/src/query_plan/query_planner.rs index 95d4b72102b..6dea6364bbc 100644 --- a/apollo-federation/src/query_plan/query_planner.rs +++ b/apollo-federation/src/query_plan/query_planner.rs @@ -1,5 +1,6 @@ use std::cell::Cell; use std::num::NonZeroU32; +use std::ops::ControlFlow; use std::ops::Deref; use std::sync::Arc; @@ -170,8 +171,8 @@ pub struct QueryPlanningStatistics { pub evaluated_plan_paths: Cell, } -#[derive(Debug, Default, Clone)] -pub struct QueryPlanOptions { +#[derive(Default, Clone)] +pub struct QueryPlanOptions<'a> { /// A set of labels which will be used _during query planning_ to /// enable/disable edges with a matching label in their override condition. /// Edges with override conditions require their label to be present or absent @@ -179,6 +180,24 @@ pub struct QueryPlanOptions { /// progressive @override feature. // PORT_NOTE: In JS implementation this was a Map pub override_conditions: Vec, + + pub check_for_cooperative_cancellation: Option<&'a dyn Fn() -> ControlFlow<()>>, +} + +impl std::fmt::Debug for QueryPlanOptions<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QueryPlanOptions") + .field("override_conditions", &self.override_conditions) + .field( + "check_for_cooperative_cancellation", + if self.check_for_cooperative_cancellation.is_some() { + &"Some(...)" + } else { + &"None" + }, + ) + .finish() + } } #[derive(Debug, Default, Clone)] @@ -414,6 +433,7 @@ impl QueryPlanner { override_conditions: EnabledOverrideConditions(IndexSet::from_iter( options.override_conditions, )), + check_for_cooperative_cancellation: options.check_for_cooperative_cancellation, fetch_id_generator: Arc::new(FetchIdGenerator::new()), }; diff --git a/apollo-federation/src/query_plan/query_planning_traversal.rs b/apollo-federation/src/query_plan/query_planning_traversal.rs index c30fdd6b916..50bb9705312 100644 --- a/apollo-federation/src/query_plan/query_planning_traversal.rs +++ b/apollo-federation/src/query_plan/query_planning_traversal.rs @@ -1,3 +1,4 @@ +use std::ops::ControlFlow; use std::sync::Arc; use apollo_compiler::collections::IndexSet; @@ -87,6 +88,20 @@ pub(crate) struct QueryPlanningParameters<'a> { pub(crate) config: QueryPlannerConfig, pub(crate) statistics: &'a QueryPlanningStatistics, pub(crate) override_conditions: EnabledOverrideConditions, + pub(crate) check_for_cooperative_cancellation: Option<&'a dyn Fn() -> ControlFlow<()>>, +} + +impl QueryPlanningParameters<'_> { + pub(crate) fn check_cancellation(&self) -> Result<(), SingleFederationError> { + if let Some(check) = self.check_for_cooperative_cancellation { + match check() { + ControlFlow::Continue(()) => Ok(()), + ControlFlow::Break(()) => Err(SingleFederationError::PlanningCancelled), + } + } else { + Ok(()) + } + } } pub(crate) struct QueryPlanningTraversal<'a, 'b> { @@ -339,6 +354,7 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> { )] fn find_best_plan_inner(&mut self) -> Result, FederationError> { while !self.open_branches.is_empty() { + self.parameters.check_cancellation()?; snapshot!( "OpenBranches", snapshot_helper::open_branches_to_string(&self.open_branches), @@ -1130,6 +1146,7 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> { statistics: self.parameters.statistics, override_conditions: self.parameters.override_conditions.clone(), fetch_id_generator: self.parameters.fetch_id_generator.clone(), + check_for_cooperative_cancellation: self.parameters.check_for_cooperative_cancellation, }; let best_plan_opt = QueryPlanningTraversal::new_inner( ¶meters, diff --git a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs index c0f1b91a313..5ac1e91a959 100644 --- a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs +++ b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs @@ -24,7 +24,8 @@ fn it_handles_progressive_override_on_root_fields() { } "#, QueryPlanOptions { - override_conditions: vec!["test".to_string()] + override_conditions: vec!["test".to_string()], + check_for_cooperative_cancellation: None, }, @r###" QueryPlan { @@ -117,7 +118,8 @@ fn it_handles_progressive_override_on_entity_fields() { } "#, QueryPlanOptions { - override_conditions: vec!["test".to_string()] + override_conditions: vec!["test".to_string()], + check_for_cooperative_cancellation: None, }, @r###" QueryPlan { @@ -276,7 +278,8 @@ fn it_handles_progressive_override_on_nested_entity_fields() { } "#, QueryPlanOptions { - override_conditions: vec!["test".to_string()] + override_conditions: vec!["test".to_string()], + check_for_cooperative_cancellation: None, }, @r###" QueryPlan { diff --git a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs index 103017912e6..ccb9a674163 100644 --- a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs +++ b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs @@ -45,7 +45,8 @@ fn it_overrides_to_s2_when_label_is_provided() { } "#, QueryPlanOptions { - override_conditions: vec!["test".to_string()] + override_conditions: vec!["test".to_string()], + check_for_cooperative_cancellation: None, }, @r###" QueryPlan { @@ -157,7 +158,8 @@ fn it_overrides_f1_to_s3_when_label_is_provided() { } "#, QueryPlanOptions { - override_conditions: vec!["test".to_string()] + override_conditions: vec!["test".to_string()], + check_for_cooperative_cancellation: None, }, @r###" QueryPlan { diff --git a/apollo-router/src/compute_job.rs b/apollo-router/src/compute_job.rs index 69c753d7997..c2f752b4632 100644 --- a/apollo-router/src/compute_job.rs +++ b/apollo-router/src/compute_job.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::ops::ControlFlow; use std::panic::UnwindSafe; use std::sync::OnceLock; @@ -32,6 +33,31 @@ fn thread_pool_size() -> usize { } } +pub(crate) struct JobStatus<'a, T> { + result_sender: &'a oneshot::Sender>, +} + +impl JobStatus<'_, T> { + /// Checks whether the oneshot receiver for the result of the job was dropped, + /// which means nothing is expecting the result anymore. + /// + /// This can happen if the Tokio task owning it is cancelled, + /// such as if a supergraph client disconnects or if a request times out. + /// + /// In this case, a long-running job should try to cancel itself + /// to avoid needless resource consumption. + pub(crate) fn check_for_cooperative_cancellation(&self) -> ControlFlow<()> { + if self.result_sender.is_closed() { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } +} + +/// We expect calling `oneshot::Sender::is_closed` to never leave the sender in a broken state. +impl UnwindSafe for JobStatus<'_, T> {} + /// Compute job queue is full #[derive(thiserror::Error, Debug, displaydoc::Display, Clone)] pub(crate) struct ComputeBackPressureError; @@ -101,13 +127,15 @@ pub(crate) fn execute( job: F, ) -> Result>, ComputeBackPressureError> where - F: FnOnce() -> T + Send + UnwindSafe + 'static, + F: FnOnce(JobStatus<'_, T>) -> T + Send + UnwindSafe + 'static, T: Send + 'static, { let (tx, rx) = oneshot::channel(); let job = Box::new(move || { + let status = JobStatus { result_sender: &tx }; + let result = std::panic::catch_unwind(move || job(status)); // Ignore the error if the oneshot receiver was dropped - let _ = tx.send(std::panic::catch_unwind(job)); + let _ = tx.send(result); }); let queue = queue(); queue.send(priority, job).map_err(|e| match e { @@ -156,7 +184,7 @@ mod tests { #[tokio::test] async fn test_executes_on_different_thread() { let test_thread = std::thread::current().id(); - let job_thread = execute(Priority::P4, || std::thread::current().id()) + let job_thread = execute(Priority::P4, |_| std::thread::current().id()) .unwrap() .await .unwrap(); @@ -169,12 +197,12 @@ mod tests { return; } let start = Instant::now(); - let one = execute(Priority::P8, || { + let one = execute(Priority::P8, |_| { std::thread::sleep(Duration::from_millis(1_000)); 1 }) .unwrap(); - let two = execute(Priority::P8, || { + let two = execute(Priority::P8, |_| { std::thread::sleep(Duration::from_millis(1_000)); 1 + 1 }) diff --git a/apollo-router/src/introspection.rs b/apollo-router/src/introspection.rs index 6f796109158..d70ff432649 100644 --- a/apollo-router/src/introspection.rs +++ b/apollo-router/src/introspection.rs @@ -160,7 +160,7 @@ impl IntrospectionCache { let schema = schema.clone(); let doc = doc.clone(); let priority = compute_job::Priority::P1; // Low priority - let response = compute_job::execute(priority, move || { + let response = compute_job::execute(priority, move |_| { Self::execute_introspection(max_depth, &schema, &doc) })? // `expect()` propagates any panic that potentially happens in the closure, but: diff --git a/apollo-router/src/query_planner/query_planner_service.rs b/apollo-router/src/query_planner/query_planner_service.rs index 9226bba70e7..11522b67c27 100644 --- a/apollo-router/src/query_planner/query_planner_service.rs +++ b/apollo-router/src/query_planner/query_planner_service.rs @@ -132,11 +132,13 @@ impl QueryPlannerService { let doc = doc.clone(); let rust_planner = self.planner.clone(); let priority = compute_job::Priority::P8; // High priority - let job = move || -> Result<_, QueryPlannerError> { + let job = move |status: compute_job::JobStatus<'_, _>| -> Result<_, QueryPlannerError> { let start = Instant::now(); + let check = move || status.check_for_cooperative_cancellation(); let query_plan_options = QueryPlanOptions { override_conditions: plan_options.override_conditions, + check_for_cooperative_cancellation: Some(&check), }; let result = operation diff --git a/apollo-router/src/services/layers/query_analysis.rs b/apollo-router/src/services/layers/query_analysis.rs index f8655447154..b644489f9a8 100644 --- a/apollo-router/src/services/layers/query_analysis.rs +++ b/apollo-router/src/services/layers/query_analysis.rs @@ -94,12 +94,15 @@ impl QueryAnalysisLayer { let schema = self.schema.clone(); let conf = self.configuration.clone(); - // Must be created *outside* of the spawn_blocking or the span is not connected to the - // parent + // Must be created *outside* of the compute_job or the span is not connected to the parent let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL"); + // TODO: is this correct? + let span = std::panic::AssertUnwindSafe(span); + let conf = std::panic::AssertUnwindSafe(conf); + let priority = compute_job::Priority::P4; // Medium priority - let job = move || { + compute_job::execute(priority, move |_| { span.in_scope(|| { Query::parse_document( &query, @@ -108,19 +111,16 @@ impl QueryAnalysisLayer { conf.as_ref(), ) }) - }; - // TODO: is this correct? - let job = std::panic::AssertUnwindSafe(job); - compute_job::execute(priority, job) - .map_err(MaybeBackPressureError::TemporaryError)? - .await - // `expect()` propagates any panic that potentially happens in the closure, but: - // - // * We try to avoid such panics in the first place and consider them bugs - // * The panic handler in `apollo-router/src/executable.rs` exits the process - // so this error case should never be reached. - .expect("Query::parse_document panicked") - .map_err(MaybeBackPressureError::PermanentError) + }) + .map_err(MaybeBackPressureError::TemporaryError)? + .await + // `expect()` propagates any panic that potentially happens in the closure, but: + // + // * We try to avoid such panics in the first place and consider them bugs + // * The panic handler in `apollo-router/src/executable.rs` exits the process + // so this error case should never be reached. + .expect("Query::parse_document panicked") + .map_err(MaybeBackPressureError::PermanentError) } /// Parses the GraphQL in the supergraph request and computes Apollo usage references. From adc76b0d6d1e2c94bb35bd38c8497a65276a41cd Mon Sep 17 00:00:00 2001 From: "Sachin D. Shinde" Date: Fri, 28 Feb 2025 10:26:36 -0800 Subject: [PATCH 2/4] Add cancellation checks to potentially heavy loops in query planning --- apollo-federation/src/operation/mod.rs | 23 +++++++++---- apollo-federation/src/operation/tests/mod.rs | 32 ++++++++++++++++++- .../src/query_graph/graph_path.rs | 16 +++++++++- .../src/query_graph/path_tree.rs | 11 +++++-- .../src/query_plan/fetch_dependency_graph.rs | 10 ++++++ .../src/query_plan/query_planner.rs | 8 +++++ .../query_plan/query_planning_traversal.rs | 12 ++++++- 7 files changed, 100 insertions(+), 12 deletions(-) diff --git a/apollo-federation/src/operation/mod.rs b/apollo-federation/src/operation/mod.rs index 1a3adcf1a97..3e05bb025af 100644 --- a/apollo-federation/src/operation/mod.rs +++ b/apollo-federation/src/operation/mod.rs @@ -1541,9 +1541,12 @@ impl SelectionSet { Ok(()) } - pub(crate) fn expand_all_fragments(&self) -> Result { + pub(crate) fn expand_all_fragments( + &self, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + ) -> Result { let mut expanded_selections = vec![]; - SelectionSet::expand_selection_set(&mut expanded_selections, self)?; + SelectionSet::expand_selection_set(&mut expanded_selections, self, check_cancellation)?; let mut expanded = SelectionSet { schema: self.schema.clone(), @@ -1557,12 +1560,14 @@ impl SelectionSet { fn expand_selection_set( destination: &mut Vec, selection_set: &SelectionSet, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result<(), FederationError> { for value in selection_set.selections.values() { + check_cancellation()?; match value { Selection::Field(field_selection) => { let selections = match &field_selection.selection_set { - Some(s) => Some(s.expand_all_fragments()?), + Some(s) => Some(s.expand_all_fragments(check_cancellation)?), None => None, }; destination.push(Selection::from_field( @@ -1580,12 +1585,14 @@ impl SelectionSet { SelectionSet::expand_selection_set( destination, &spread_selection.selection_set, + check_cancellation, )?; } else { // convert to inline fragment let expanded = InlineFragmentSelection::from_fragment_spread_selection( selection_set.type_position.clone(), // the parent type of this inline selection spread_selection, + check_cancellation, )?; destination.push(Selection::InlineFragment(Arc::new(expanded))); } @@ -1594,7 +1601,9 @@ impl SelectionSet { destination.push( InlineFragmentSelection::new( inline_selection.inline_fragment.clone(), - inline_selection.selection_set.expand_all_fragments()?, + inline_selection + .selection_set + .expand_all_fragments(check_cancellation)?, ) .into(), ); @@ -2716,6 +2725,7 @@ impl InlineFragmentSelection { pub(crate) fn from_fragment_spread_selection( parent_type_position: CompositeTypeDefinitionPosition, fragment_spread_selection: &Arc, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result { let schema = fragment_spread_selection.spread.schema.schema(); for directive in fragment_spread_selection.spread.directives.iter() { @@ -2753,7 +2763,7 @@ impl InlineFragmentSelection { }, fragment_spread_selection .selection_set - .expand_all_fragments()?, + .expand_all_fragments(check_cancellation)?, )) } @@ -3749,10 +3759,11 @@ pub(crate) fn normalize_operation( named_fragments: NamedFragments, schema: &ValidFederationSchema, interface_types_with_interface_objects: &IndexSet, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result { let mut normalized_selection_set = SelectionSet::from_selection_set(&operation.selection_set, &named_fragments, schema)?; - normalized_selection_set = normalized_selection_set.expand_all_fragments()?; + normalized_selection_set = normalized_selection_set.expand_all_fragments(check_cancellation)?; // We clear up the fragments since we've expanded all. // Also note that expanding fragment usually generate unnecessary fragments/inefficient // selections, so it basically always make sense to flatten afterwards. Besides, fragment diff --git a/apollo-federation/src/operation/tests/mod.rs b/apollo-federation/src/operation/tests/mod.rs index 592fc6f6dc2..6ade590ca6b 100644 --- a/apollo-federation/src/operation/tests/mod.rs +++ b/apollo-federation/src/operation/tests/mod.rs @@ -60,7 +60,13 @@ pub(super) fn parse_and_expand( .expect("must have anonymous operation"); let fragments = NamedFragments::new(&doc.fragments, schema); - normalize_operation(operation, fragments, schema, &Default::default()) + normalize_operation( + operation, + fragments, + schema, + &Default::default(), + &|| Ok(()), + ) } #[test] @@ -100,6 +106,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); normalized_operation.named_fragments = Default::default(); @@ -154,6 +161,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); normalized_operation.named_fragments = Default::default(); @@ -196,6 +204,7 @@ type Query { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); @@ -231,6 +240,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test { @@ -274,6 +284,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -320,6 +331,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -364,6 +376,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -410,6 +423,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) { @@ -461,6 +475,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test { @@ -527,6 +542,7 @@ type V { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test { @@ -586,6 +602,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test { @@ -632,6 +649,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -682,6 +700,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -730,6 +749,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -778,6 +798,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) { @@ -830,6 +851,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test { @@ -898,6 +920,7 @@ type V { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query Test { @@ -944,6 +967,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query TestQuery { @@ -983,6 +1007,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); let expected = r#"query TestQuery { @@ -1033,6 +1058,7 @@ scalar FieldSet NamedFragments::new(&executable_document.fragments, &schema), &schema, &interface_objects, + &|| Ok(()), ) .unwrap(); let expected = r#"query TestQuery { @@ -1172,6 +1198,7 @@ mod make_selection_tests { Default::default(), &schema, &Default::default(), + &|| Ok(()), ) .unwrap(); @@ -1271,6 +1298,7 @@ mod lazy_map_tests { Default::default(), &schema, &Default::default(), + &|| Ok(()), ) .unwrap(); @@ -1329,6 +1357,7 @@ mod lazy_map_tests { Default::default(), &schema, &Default::default(), + &|| Ok(()), ) .unwrap(); @@ -1534,6 +1563,7 @@ fn test_expand_all_fragments1() { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), + &|| Ok(()), ) .unwrap(); normalized_operation.named_fragments = Default::default(); diff --git a/apollo-federation/src/query_graph/graph_path.rs b/apollo-federation/src/query_graph/graph_path.rs index 936a7c465b9..705be32fe0a 100644 --- a/apollo-federation/src/query_graph/graph_path.rs +++ b/apollo-federation/src/query_graph/graph_path.rs @@ -2989,6 +2989,7 @@ impl OpGraphPath { context: &OpGraphPathContext, condition_resolver: &mut impl ConditionResolver, override_conditions: &EnabledOverrideConditions, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result<(Option>, Option), FederationError> { let span = debug_span!( "Trying to advance directly", @@ -3263,6 +3264,7 @@ impl OpGraphPath { &implementation_inline_fragment.into(), condition_resolver, override_conditions, + check_cancellation, )?; // If we find no options for that implementation, we bail (as we need to // simultaneously advance all implementations). @@ -3300,6 +3302,7 @@ impl OpGraphPath { operation_element, condition_resolver, override_conditions, + check_cancellation, )?; let Some(field_options_for_implementation) = field_options_for_implementation @@ -3339,6 +3342,7 @@ impl OpGraphPath { } let all_options = SimultaneousPaths::flat_cartesian_product( options_for_each_implementation, + check_cancellation, )?; if let Some(interface_path) = interface_path { let (interface_path, all_options) = @@ -3486,6 +3490,7 @@ impl OpGraphPath { &implementation_inline_fragment.into(), condition_resolver, override_conditions, + check_cancellation, )?; let Some(implementation_options) = implementation_options else { drop(guard); @@ -3512,6 +3517,7 @@ impl OpGraphPath { } let all_options = SimultaneousPaths::flat_cartesian_product( options_for_each_implementation, + check_cancellation, )?; debug!("Type-exploded options: {}", DisplaySlice(&all_options)); Ok((Some(all_options), None)) @@ -3778,6 +3784,7 @@ impl SimultaneousPaths { /// the options for the `SimultaneousPaths` as a whole. fn flat_cartesian_product( options_for_each_path: Vec>, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { // This can be written more tersely with a bunch of `reduce()`/`flat_map()`s and friends, // but when interfaces type-explode into many implementations, this can end up with fairly @@ -3808,6 +3815,7 @@ impl SimultaneousPaths { // Compute the cartesian product. for _ in 0..num_options { + check_cancellation()?; let num_simultaneous_paths = options_for_each_path .iter() .zip(&option_indexes) @@ -3973,6 +3981,7 @@ impl SimultaneousPathsWithLazyIndirectPaths { operation_element: &OpPathElement, condition_resolver: &mut impl ConditionResolver, override_conditions: &EnabledOverrideConditions, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result>, FederationError> { debug!( "Trying to advance paths for operation: path = {}, operation = {operation_element}", @@ -3987,6 +3996,7 @@ impl SimultaneousPathsWithLazyIndirectPaths { // references to `self`, which means cloning these paths when iterating. let paths = self.paths.0.clone(); for (path_index, path) in paths.iter().enumerate() { + check_cancellation()?; debug!("Computing options for {path}"); let span = debug_span!(" |"); let gaurd = span.enter(); @@ -4004,6 +4014,7 @@ impl SimultaneousPathsWithLazyIndirectPaths { &updated_context, condition_resolver, override_conditions, + check_cancellation, )?; debug!("{advance_options:?}"); drop(gaurd); @@ -4076,6 +4087,7 @@ impl SimultaneousPathsWithLazyIndirectPaths { &updated_context, condition_resolver, override_conditions, + check_cancellation, )?; // If we can't advance the operation element after that path, ignore it, // it's just not an option. @@ -4164,6 +4176,7 @@ impl SimultaneousPathsWithLazyIndirectPaths { &updated_context, condition_resolver, override_conditions, + check_cancellation, )?; options = advance_options.unwrap_or_else(Vec::new); debug!("{options:?}"); @@ -4180,7 +4193,8 @@ impl SimultaneousPathsWithLazyIndirectPaths { } } - let all_options = SimultaneousPaths::flat_cartesian_product(options_for_each_path)?; + let all_options = + SimultaneousPaths::flat_cartesian_product(options_for_each_path, check_cancellation)?; debug!("{all_options:?}"); Ok(Some(self.create_lazy_options(all_options, updated_context))) } diff --git a/apollo-federation/src/query_graph/path_tree.rs b/apollo-federation/src/query_graph/path_tree.rs index 21a817fde6b..752c92ba732 100644 --- a/apollo-federation/src/query_graph/path_tree.rs +++ b/apollo-federation/src/query_graph/path_tree.rs @@ -715,9 +715,14 @@ mod tests { "Query(Test) --[t]--> T(Test) --[otherId]--> ID(Test)" ); - let normalized_operation = - normalize_operation(operation, Default::default(), &schema, &Default::default()) - .unwrap(); + let normalized_operation = normalize_operation( + operation, + Default::default(), + &schema, + &Default::default(), + &|| Ok(()), + ) + .unwrap(); let selection_set = Arc::new(normalized_operation.selection_set); let paths = vec![ diff --git a/apollo-federation/src/query_plan/fetch_dependency_graph.rs b/apollo-federation/src/query_plan/fetch_dependency_graph.rs index f004346f0bd..98213b9f19d 100644 --- a/apollo-federation/src/query_plan/fetch_dependency_graph.rs +++ b/apollo-federation/src/query_plan/fetch_dependency_graph.rs @@ -3488,6 +3488,7 @@ pub(crate) fn compute_nodes_for_tree( initial_node_path: FetchDependencyGraphNodePath, initial_defer_context: DeferContext, initial_conditions: &OpGraphPathContext, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { snapshot!("OpPathTree", initial_tree.to_string(), "path_tree"); let mut stack = vec![ComputeNodesStackItem { @@ -3500,6 +3501,7 @@ pub(crate) fn compute_nodes_for_tree( }]; let mut created_nodes = IndexSet::default(); while let Some(stack_item) = stack.pop() { + check_cancellation()?; let node = FetchDependencyGraph::node_weight_mut(&mut dependency_graph.graph, stack_item.node_id)?; for selection_set in &stack_item.tree.local_selection_sets { @@ -3545,6 +3547,7 @@ pub(crate) fn compute_nodes_for_tree( edge_id, new_context, &mut created_nodes, + check_cancellation, )?); } QueryGraphEdgeTransition::RootTypeResolution { root_kind } => { @@ -3572,6 +3575,7 @@ pub(crate) fn compute_nodes_for_tree( child, operation, &mut created_nodes, + check_cancellation, )?); } } @@ -3596,6 +3600,7 @@ fn compute_nodes_for_key_resolution<'a>( edge_id: EdgeIndex, new_context: &'a OpGraphPathContext, created_nodes: &mut IndexSet, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { let edge = stack_item.tree.graph.edge_weight(edge_id)?; let Some(conditions) = &child.conditions else { @@ -3611,6 +3616,7 @@ fn compute_nodes_for_key_resolution<'a>( stack_item.node_path.clone(), stack_item.defer_context.for_conditions(), &Default::default(), + check_cancellation, )?; created_nodes.extend(conditions_nodes.iter().copied()); // Then we can "take the edge", creating a new node. @@ -3849,6 +3855,7 @@ fn compute_nodes_for_op_path_element<'a>( child: &'a Arc>>, operation_element: &OpPathElement, created_nodes: &mut IndexSet, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { let Some(edge_id) = child.edge else { // A null edge means that the operation does nothing @@ -3955,6 +3962,7 @@ fn compute_nodes_for_op_path_element<'a>( }, &updated.defer_context, created_nodes, + check_cancellation, )?; if let Some(matching_context_ids) = &child.matching_context_ids { @@ -4434,6 +4442,7 @@ fn handle_conditions_tree( query_graph_edge_id_if_typename_needed: Option, defer_context: &DeferContext, created_nodes: &mut IndexSet, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result { // In many cases, we can optimize conditions by merging the fields into previously existing // nodes. However, we only do this when the current node has only a single parent (it's hard to @@ -4517,6 +4526,7 @@ fn handle_conditions_tree( fetch_node_path.clone(), defer_context.for_conditions(), &OpGraphPathContext::default(), + check_cancellation, )?; if newly_created_node_ids.is_empty() { diff --git a/apollo-federation/src/query_plan/query_planner.rs b/apollo-federation/src/query_plan/query_planner.rs index 6dea6364bbc..b39fea1e383 100644 --- a/apollo-federation/src/query_plan/query_planner.rs +++ b/apollo-federation/src/query_plan/query_planner.rs @@ -367,6 +367,11 @@ impl QueryPlanner { NamedFragments::new(&document.fragments, &self.api_schema), &self.api_schema, &self.interface_types_with_interface_objects, + &|| { + QueryPlanningParameters::check_cancellation_with( + &options.check_for_cooperative_cancellation, + ) + }, )?; let NormalizedDefer { @@ -573,6 +578,7 @@ fn compute_root_serial_dependency_graph( &mut fetch_dependency_graph, &prev_path, parameters.config.type_conditioned_fetching, + &|| parameters.check_cancellation(), )?; } else { // PORT_NOTE: It is unclear if they correct thing to do here is get the next ID, use @@ -611,6 +617,7 @@ pub(crate) fn compute_root_fetch_groups( dependency_graph: &mut FetchDependencyGraph, path: &OpPathTree, type_conditioned_fetching_enabled: bool, + check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, ) -> Result<(), FederationError> { // The root of the pathTree is one of the "fake" root of the subgraphs graph, // which belongs to no subgraph but points to each ones. @@ -660,6 +667,7 @@ pub(crate) fn compute_root_fetch_groups( )?, Default::default(), &Default::default(), + check_cancellation, )?; } Ok(()) diff --git a/apollo-federation/src/query_plan/query_planning_traversal.rs b/apollo-federation/src/query_plan/query_planning_traversal.rs index 50bb9705312..7b3616ad270 100644 --- a/apollo-federation/src/query_plan/query_planning_traversal.rs +++ b/apollo-federation/src/query_plan/query_planning_traversal.rs @@ -93,7 +93,13 @@ pub(crate) struct QueryPlanningParameters<'a> { impl QueryPlanningParameters<'_> { pub(crate) fn check_cancellation(&self) -> Result<(), SingleFederationError> { - if let Some(check) = self.check_for_cooperative_cancellation { + Self::check_cancellation_with(&self.check_for_cooperative_cancellation) + } + + pub(crate) fn check_cancellation_with( + check: &Option<&dyn Fn() -> ControlFlow<()>>, + ) -> Result<(), SingleFederationError> { + if let Some(check) = check { match check() { ControlFlow::Continue(()) => Ok(()), ControlFlow::Break(()) => Err(SingleFederationError::PlanningCancelled), @@ -417,11 +423,13 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> { ); for option in options.iter_mut() { + self.parameters.check_cancellation()?; let followups_for_option = option.advance_with_operation_element( self.parameters.supergraph_schema.clone(), &operation_element, /*resolver*/ self, &self.parameters.override_conditions, + &|| self.parameters.check_cancellation(), )?; let Some(followups_for_option) = followups_for_option else { // There is no valid way to advance the current operation element from this option @@ -1058,6 +1066,7 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> { dependency_graph, path_tree, type_conditioned_fetching_enabled, + &|| self.parameters.check_cancellation(), )?; } else { let query_graph_node = path_tree.graph.node_weight(path_tree.node)?; @@ -1095,6 +1104,7 @@ impl<'a: 'b, 'b> QueryPlanningTraversal<'a, 'b> { )?, Default::default(), &Default::default(), + &|| self.parameters.check_cancellation(), )?; } From a36b08e910821348fd1afef93ac0f5c5cd3fa380 Mon Sep 17 00:00:00 2001 From: "Sachin D. Shinde" Date: Fri, 28 Feb 2025 12:21:45 -0800 Subject: [PATCH 3/4] Add a changeset --- .changesets/fix_stance_intercom_body_director.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changesets/fix_stance_intercom_body_director.md diff --git a/.changesets/fix_stance_intercom_body_director.md b/.changesets/fix_stance_intercom_body_director.md new file mode 100644 index 00000000000..d6a79ac77c9 --- /dev/null +++ b/.changesets/fix_stance_intercom_body_director.md @@ -0,0 +1,5 @@ +### Ensure query planning exits when its request times out ([PR #6840](https://github.com/apollographql/router/pull/6840)) + +When a request times out/is cancelled, resources allocated to that request should be released. Previously, query planning could potentially keep running after the request is cancelled. After this fix, query planning now exits shortly after the request is cancelled. If you need query planning to run longer, you can [increase the request timeout](https://www.apollographql.com/docs/graphos/routing/performance/traffic-shaping#timeouts). + +By [@SimonSapin](https://github.com/SimonSapin) and [@sachindshinde](https://github.com/sachindshinde) in https://github.com/apollographql/router/pull/6840 \ No newline at end of file From 40728544d3537a9166b2f951ca38fafdee3c22e2 Mon Sep 17 00:00:00 2001 From: "Sachin D. Shinde" Date: Mon, 3 Mar 2025 12:56:18 -0800 Subject: [PATCH 4/4] Address feedback (switch to dyn, and use named function for no-op check cancellation) --- apollo-federation/src/operation/mod.rs | 10 ++-- apollo-federation/src/operation/tests/mod.rs | 57 +++++++++++-------- .../src/query_graph/graph_path.rs | 6 +- .../src/query_graph/path_tree.rs | 3 +- .../src/query_plan/fetch_dependency_graph.rs | 8 +-- .../src/query_plan/query_planner.rs | 2 +- 6 files changed, 48 insertions(+), 38 deletions(-) diff --git a/apollo-federation/src/operation/mod.rs b/apollo-federation/src/operation/mod.rs index 3e05bb025af..a6dd0090ca1 100644 --- a/apollo-federation/src/operation/mod.rs +++ b/apollo-federation/src/operation/mod.rs @@ -64,6 +64,8 @@ pub(crate) use contains::*; pub(crate) use directive_list::DirectiveList; pub(crate) use merging::*; pub(crate) use rebase::*; +#[cfg(test)] +pub(crate) use tests::never_cancel; pub(crate) const TYPENAME_FIELD: Name = name!("__typename"); @@ -1543,7 +1545,7 @@ impl SelectionSet { pub(crate) fn expand_all_fragments( &self, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result { let mut expanded_selections = vec![]; SelectionSet::expand_selection_set(&mut expanded_selections, self, check_cancellation)?; @@ -1560,7 +1562,7 @@ impl SelectionSet { fn expand_selection_set( destination: &mut Vec, selection_set: &SelectionSet, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result<(), FederationError> { for value in selection_set.selections.values() { check_cancellation()?; @@ -2725,7 +2727,7 @@ impl InlineFragmentSelection { pub(crate) fn from_fragment_spread_selection( parent_type_position: CompositeTypeDefinitionPosition, fragment_spread_selection: &Arc, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result { let schema = fragment_spread_selection.spread.schema.schema(); for directive in fragment_spread_selection.spread.directives.iter() { @@ -3759,7 +3761,7 @@ pub(crate) fn normalize_operation( named_fragments: NamedFragments, schema: &ValidFederationSchema, interface_types_with_interface_objects: &IndexSet, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result { let mut normalized_selection_set = SelectionSet::from_selection_set(&operation.selection_set, &named_fragments, schema)?; diff --git a/apollo-federation/src/operation/tests/mod.rs b/apollo-federation/src/operation/tests/mod.rs index 6ade590ca6b..b3ad0a69b0d 100644 --- a/apollo-federation/src/operation/tests/mod.rs +++ b/apollo-federation/src/operation/tests/mod.rs @@ -13,6 +13,7 @@ use super::SelectionKey; use super::SelectionSet; use super::normalize_operation; use crate::error::FederationError; +use crate::error::SingleFederationError; use crate::query_graph::graph_path::OpPathElement; use crate::schema::ValidFederationSchema; use crate::schema::position::InterfaceTypeDefinitionPosition; @@ -65,10 +66,16 @@ pub(super) fn parse_and_expand( fragments, schema, &Default::default(), - &|| Ok(()), + &never_cancel, ) } +/// The `normalize_operation()` function has a `check_cancellation` parameter that we'll want to +/// configure to never cancel during tests. We create a convenience function here for that purpose. +pub(crate) fn never_cancel() -> Result<(), SingleFederationError> { + Ok(()) +} + #[test] fn expands_named_fragments() { let operation_with_named_fragment = r#" @@ -106,7 +113,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); normalized_operation.named_fragments = Default::default(); @@ -161,7 +168,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); normalized_operation.named_fragments = Default::default(); @@ -204,7 +211,7 @@ type Query { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); @@ -240,7 +247,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test { @@ -284,7 +291,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -331,7 +338,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -376,7 +383,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -423,7 +430,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) { @@ -475,7 +482,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test { @@ -542,7 +549,7 @@ type V { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test { @@ -602,7 +609,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test { @@ -649,7 +656,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -700,7 +707,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -749,7 +756,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skipIf: Boolean!) { @@ -798,7 +805,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) { @@ -851,7 +858,7 @@ type T { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test { @@ -920,7 +927,7 @@ type V { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query Test { @@ -967,7 +974,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query TestQuery { @@ -1007,7 +1014,7 @@ type Foo { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query TestQuery { @@ -1058,7 +1065,7 @@ scalar FieldSet NamedFragments::new(&executable_document.fragments, &schema), &schema, &interface_objects, - &|| Ok(()), + &never_cancel, ) .unwrap(); let expected = r#"query TestQuery { @@ -1198,7 +1205,7 @@ mod make_selection_tests { Default::default(), &schema, &Default::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); @@ -1298,7 +1305,7 @@ mod lazy_map_tests { Default::default(), &schema, &Default::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); @@ -1357,7 +1364,7 @@ mod lazy_map_tests { Default::default(), &schema, &Default::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); @@ -1563,7 +1570,7 @@ fn test_expand_all_fragments1() { NamedFragments::new(&executable_document.fragments, &schema), &schema, &IndexSet::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); normalized_operation.named_fragments = Default::default(); diff --git a/apollo-federation/src/query_graph/graph_path.rs b/apollo-federation/src/query_graph/graph_path.rs index 705be32fe0a..fb47334321d 100644 --- a/apollo-federation/src/query_graph/graph_path.rs +++ b/apollo-federation/src/query_graph/graph_path.rs @@ -2989,7 +2989,7 @@ impl OpGraphPath { context: &OpGraphPathContext, condition_resolver: &mut impl ConditionResolver, override_conditions: &EnabledOverrideConditions, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result<(Option>, Option), FederationError> { let span = debug_span!( "Trying to advance directly", @@ -3784,7 +3784,7 @@ impl SimultaneousPaths { /// the options for the `SimultaneousPaths` as a whole. fn flat_cartesian_product( options_for_each_path: Vec>, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { // This can be written more tersely with a bunch of `reduce()`/`flat_map()`s and friends, // but when interfaces type-explode into many implementations, this can end up with fairly @@ -3981,7 +3981,7 @@ impl SimultaneousPathsWithLazyIndirectPaths { operation_element: &OpPathElement, condition_resolver: &mut impl ConditionResolver, override_conditions: &EnabledOverrideConditions, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result>, FederationError> { debug!( "Trying to advance paths for operation: path = {}, operation = {operation_element}", diff --git a/apollo-federation/src/query_graph/path_tree.rs b/apollo-federation/src/query_graph/path_tree.rs index 752c92ba732..21311355a48 100644 --- a/apollo-federation/src/query_graph/path_tree.rs +++ b/apollo-federation/src/query_graph/path_tree.rs @@ -581,6 +581,7 @@ mod tests { use crate::error::FederationError; use crate::operation::Field; + use crate::operation::never_cancel; use crate::operation::normalize_operation; use crate::query_graph::QueryGraph; use crate::query_graph::QueryGraphEdgeTransition; @@ -720,7 +721,7 @@ mod tests { Default::default(), &schema, &Default::default(), - &|| Ok(()), + &never_cancel, ) .unwrap(); let selection_set = Arc::new(normalized_operation.selection_set); diff --git a/apollo-federation/src/query_plan/fetch_dependency_graph.rs b/apollo-federation/src/query_plan/fetch_dependency_graph.rs index 98213b9f19d..010803a4a76 100644 --- a/apollo-federation/src/query_plan/fetch_dependency_graph.rs +++ b/apollo-federation/src/query_plan/fetch_dependency_graph.rs @@ -3488,7 +3488,7 @@ pub(crate) fn compute_nodes_for_tree( initial_node_path: FetchDependencyGraphNodePath, initial_defer_context: DeferContext, initial_conditions: &OpGraphPathContext, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { snapshot!("OpPathTree", initial_tree.to_string(), "path_tree"); let mut stack = vec![ComputeNodesStackItem { @@ -3600,7 +3600,7 @@ fn compute_nodes_for_key_resolution<'a>( edge_id: EdgeIndex, new_context: &'a OpGraphPathContext, created_nodes: &mut IndexSet, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { let edge = stack_item.tree.graph.edge_weight(edge_id)?; let Some(conditions) = &child.conditions else { @@ -3855,7 +3855,7 @@ fn compute_nodes_for_op_path_element<'a>( child: &'a Arc>>, operation_element: &OpPathElement, created_nodes: &mut IndexSet, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result, FederationError> { let Some(edge_id) = child.edge else { // A null edge means that the operation does nothing @@ -4442,7 +4442,7 @@ fn handle_conditions_tree( query_graph_edge_id_if_typename_needed: Option, defer_context: &DeferContext, created_nodes: &mut IndexSet, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result { // In many cases, we can optimize conditions by merging the fields into previously existing // nodes. However, we only do this when the current node has only a single parent (it's hard to diff --git a/apollo-federation/src/query_plan/query_planner.rs b/apollo-federation/src/query_plan/query_planner.rs index b39fea1e383..064086575fb 100644 --- a/apollo-federation/src/query_plan/query_planner.rs +++ b/apollo-federation/src/query_plan/query_planner.rs @@ -617,7 +617,7 @@ pub(crate) fn compute_root_fetch_groups( dependency_graph: &mut FetchDependencyGraph, path: &OpPathTree, type_conditioned_fetching_enabled: bool, - check_cancellation: &impl Fn() -> Result<(), SingleFederationError>, + check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>, ) -> Result<(), FederationError> { // The root of the pathTree is one of the "fake" root of the subgraphs graph, // which belongs to no subgraph but points to each ones.