diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java index 3cbb51edb63b7..041477740746f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java @@ -19,35 +19,17 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.slf4j.Logger; -import javax.annotation.Nullable; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; - /** Initial state of the {@link AdaptiveScheduler}. */ -class Created implements State { +class Created extends StateWithoutExecutionGraph { private final Context context; - private final Logger logger; - Created(Context context, Logger logger) { + super(context, logger); this.context = context; - this.logger = logger; - } - - @Override - public void cancel() { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); - } - - @Override - public void suspend(Throwable cause) { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); } @Override @@ -55,41 +37,14 @@ public JobStatus getJobStatus() { return JobStatus.INITIALIZING; } - @Override - public ArchivedExecutionGraph getJob() { - return context.getArchivedExecutionGraph(getJobStatus(), null); - } - - @Override - public void handleGlobalFailure( - Throwable cause, CompletableFuture> failureLabels) { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); - } - - @Override - public Logger getLogger() { - return logger; - } - /** Starts the scheduling by going into the {@link WaitingForResources} state. */ void startScheduling() { context.goToWaitingForResources(null); } /** Context of the {@link Created} state. */ - interface Context extends StateTransitions.ToFinished, StateTransitions.ToWaitingForResources { - - /** - * Creates an {@link ArchivedExecutionGraph} for the given jobStatus and failure cause. - * - * @param jobStatus jobStatus to create the {@link ArchivedExecutionGraph} with - * @param cause cause represents the failure cause for the {@link ArchivedExecutionGraph}; - * {@code null} if there is no failure cause - * @return the created {@link ArchivedExecutionGraph} - */ - ArchivedExecutionGraph getArchivedExecutionGraph( - JobStatus jobStatus, @Nullable Throwable cause); - } + interface Context + extends StateWithoutExecutionGraph.Context, StateTransitions.ToWaitingForResources {} static class Factory implements StateFactory { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index c876fe6ad1d83..da90ef1468d1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -43,7 +42,6 @@ import java.time.Duration; import java.util.Collections; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -56,10 +54,9 @@ * If there are enough slots for the {@link ExecutionGraph} to run, the state transitions to {@link * Executing}. */ -public class CreatingExecutionGraph implements State { +public class CreatingExecutionGraph extends StateWithoutExecutionGraph { private final Context context; - private final Logger logger; private final OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory; private final @Nullable ExecutionGraph previousExecutionGraph; @@ -71,8 +68,8 @@ public CreatingExecutionGraph( Logger logger, OperatorCoordinatorHandlerFactory operatorCoordinatorFactory, ExecutionGraph previousExecutionGraph1) { + super(context, logger); this.context = context; - this.logger = logger; this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory; FutureUtils.assertNoException( @@ -93,11 +90,12 @@ private void handleExecutionGraphCreation( @Nullable ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism, @Nullable Throwable throwable) { if (throwable != null) { - logger.info( - "Failed to go from {} to {} because the ExecutionGraph creation failed.", - CreatingExecutionGraph.class.getSimpleName(), - Executing.class.getSimpleName(), - throwable); + getLogger() + .info( + "Failed to go from {} to {} because the ExecutionGraph creation failed.", + CreatingExecutionGraph.class.getSimpleName(), + Executing.class.getSimpleName(), + throwable); context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable)); } else { for (ExecutionVertex vertex : @@ -109,8 +107,9 @@ private void handleExecutionGraphCreation( context.tryToAssignSlots(executionGraphWithVertexParallelism); if (result.isSuccess()) { - logger.debug( - "Successfully reserved and assigned the required slots for the ExecutionGraph."); + getLogger() + .debug( + "Successfully reserved and assigned the required slots for the ExecutionGraph."); final ExecutionGraph executionGraph = result.getExecutionGraph(); final ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler( @@ -144,62 +143,26 @@ private void handleExecutionGraphCreation( operatorCoordinatorHandler, Collections.emptyList()); } else { - logger.debug( - "Failed to reserve and assign the required slots. Waiting for new resources."); + getLogger() + .debug( + "Failed to reserve and assign the required slots. Waiting for new resources."); context.goToWaitingForResources(previousExecutionGraph); } } } - @Override - public void cancel() { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); - } - - @Override - public void suspend(Throwable cause) { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); - } - @Override public JobStatus getJobStatus() { return JobStatus.CREATED; } - @Override - public ArchivedExecutionGraph getJob() { - return context.getArchivedExecutionGraph(getJobStatus(), null); - } - - @Override - public void handleGlobalFailure( - Throwable cause, CompletableFuture> failureLabels) { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); - } - - @Override - public Logger getLogger() { - return logger; - } - /** Context for the {@link CreatingExecutionGraph} state. */ interface Context - extends GlobalFailureHandler, + extends StateWithoutExecutionGraph.Context, + GlobalFailureHandler, StateTransitions.ToExecuting, - StateTransitions.ToFinished, StateTransitions.ToWaitingForResources { - /** - * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can - * be null if there is no failure. - * - * @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with - * @param cause cause describing a failure cause; {@code null} if there is none - * @return the created {@link ArchivedExecutionGraph} - */ - ArchivedExecutionGraph getArchivedExecutionGraph( - JobStatus jobStatus, @Nullable Throwable cause); - /** * Runs the given action after a delay if the state at this time equals the expected state. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java new file mode 100644 index 0000000000000..16769e310df3f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Abstract state class which contains its {@link Context} and {@link #logger} to execute common + * operations. + */ +abstract class StateWithoutExecutionGraph implements State { + + private final Context context; + + private final Logger logger; + + StateWithoutExecutionGraph(Context context, Logger logger) { + this.context = context; + this.logger = logger; + } + + @Override + public void cancel() { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); + } + + @Override + public void suspend(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); + } + + @Override + public ArchivedExecutionGraph getJob() { + return context.getArchivedExecutionGraph(getJobStatus(), null); + } + + @Override + public void handleGlobalFailure( + Throwable cause, CompletableFuture> failureLabels) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); + } + + @Override + public Logger getLogger() { + return logger; + } + + /** Context of the {@link StateWithoutExecutionGraph} state. */ + interface Context extends StateTransitions.ToFinished { + + /** + * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can + * be null if there is no failure. + * + * @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with + * @param cause cause describing a failure cause; {@code null} if there is none + * @return the created {@link ArchivedExecutionGraph} + */ + ArchivedExecutionGraph getArchivedExecutionGraph( + JobStatus jobStatus, @Nullable Throwable cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index 434b0741a28cd..d7933984db3eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.util.Preconditions; import org.apache.flink.util.clock.Clock; @@ -32,19 +31,15 @@ import javax.annotation.Nullable; import java.time.Duration; -import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; /** * State which describes that the scheduler is waiting for resources in order to execute the job. */ -class WaitingForResources implements State, ResourceListener { +class WaitingForResources extends StateWithoutExecutionGraph implements ResourceListener { private final Context context; - private final Logger log; - private final Clock clock; /** If set, there's an ongoing deadline waiting for a resource stabilization. */ @@ -78,8 +73,8 @@ class WaitingForResources implements State, ResourceListener { Duration resourceStabilizationTimeout, Clock clock, @Nullable ExecutionGraph previousExecutionGraph) { + super(context, log); this.context = Preconditions.checkNotNull(context); - this.log = Preconditions.checkNotNull(log); this.resourceStabilizationTimeout = Preconditions.checkNotNull(resourceStabilizationTimeout); this.clock = clock; @@ -106,37 +101,11 @@ public void onLeave(Class newState) { } } - @Override - public void cancel() { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); - } - - @Override - public void suspend(Throwable cause) { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); - } - @Override public JobStatus getJobStatus() { return JobStatus.CREATED; } - @Override - public ArchivedExecutionGraph getJob() { - return context.getArchivedExecutionGraph(getJobStatus(), null); - } - - @Override - public void handleGlobalFailure( - Throwable cause, CompletableFuture> failureLabels) { - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); - } - - @Override - public Logger getLogger() { - return log; - } - @Override public void onNewResourcesAvailable() { checkDesiredOrSufficientResourcesAvailable(); @@ -174,8 +143,9 @@ private void checkDesiredOrSufficientResourcesAvailable() { } private void resourceTimeout() { - log.debug( - "Initial resource allocation timeout triggered: Creating ExecutionGraph with available resources."); + getLogger() + .debug( + "Initial resource allocation timeout triggered: Creating ExecutionGraph with available resources."); createExecutionGraphWithAvailableResources(); } @@ -185,18 +155,7 @@ private void createExecutionGraphWithAvailableResources() { /** Context of the {@link WaitingForResources} state. */ interface Context - extends StateTransitions.ToCreatingExecutionGraph, StateTransitions.ToFinished { - - /** - * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can - * be null if there is no failure. - * - * @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with - * @param cause cause describing a failure cause; {@code null} if there is none - * @return the created {@link ArchivedExecutionGraph} - */ - ArchivedExecutionGraph getArchivedExecutionGraph( - JobStatus jobStatus, @Nullable Throwable cause); + extends StateWithoutExecutionGraph.Context, StateTransitions.ToCreatingExecutionGraph { /** * Checks whether we have the desired resources. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java index 302fb471c3e6b..9a13f5deb6eed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java @@ -18,15 +18,11 @@ package org.apache.flink.runtime.scheduler.adaptive; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.failure.FailureEnricherUtils; -import org.apache.flink.util.FlinkException; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -34,8 +30,6 @@ import javax.annotation.Nullable; -import java.util.function.Consumer; - import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link Created} state. */ @@ -45,18 +39,6 @@ class CreatedTest { @RegisterExtension MockCreatedContext ctx = new MockCreatedContext(); - @Test - void testCancel() { - Created created = new Created(ctx, LOG); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); - }); - created.cancel(); - } - @Test void testStartScheduling() { Created created = new Created(ctx, LOG); @@ -66,46 +48,6 @@ void testStartScheduling() { created.startScheduling(); } - @Test - void testSuspend() { - FlinkException expectedException = new FlinkException("This is a test exception"); - Created created = new Created(ctx, LOG); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); - assertThat( - archivedExecutionGraph - .getFailureInfo() - .getException() - .deserializeError(this.getClass().getClassLoader())) - .isEqualTo(expectedException); - }); - - created.suspend(expectedException); - } - - @Test - void testFailure() { - Created created = new Created(ctx, LOG); - RuntimeException expectedException = new RuntimeException("This is a test exception"); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); - assertThat( - archivedExecutionGraph - .getFailureInfo() - .getException() - .deserializeError(this.getClass().getClassLoader())) - .isEqualTo(expectedException); - }); - - created.handleGlobalFailure(expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); - } - @Test void testJobInformation() { Created created = new Created(ctx, LOG); @@ -113,32 +55,15 @@ void testJobInformation() { assertThat(job.getState()).isEqualTo(JobStatus.INITIALIZING); } - static class MockCreatedContext implements Created.Context, AfterEachCallback { - private final StateValidator finishedStateValidator = - new StateValidator<>("finished"); + static class MockCreatedContext extends MockStateWithoutExecutionGraphContext + implements Created.Context { private final StateValidator waitingForResourcesStateValidator = new StateValidator<>("WaitingForResources"); - public void setExpectFinished(Consumer asserter) { - finishedStateValidator.expectInput(asserter); - } - public void setExpectWaitingForResources() { waitingForResourcesStateValidator.expectInput((none) -> {}); } - @Override - public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { - finishedStateValidator.validateInput(archivedExecutionGraph); - } - - @Override - public ArchivedExecutionGraph getArchivedExecutionGraph( - JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - new JobID(), "testJob", jobStatus, cause, null, 0L); - } - @Override public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { waitingForResourcesStateValidator.validateInput(null); @@ -146,7 +71,7 @@ public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGr @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - finishedStateValidator.close(); + super.afterEach(extensionContext); waitingForResourcesStateValidator.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index 833835b7d22d8..b831b3bb62f90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -18,14 +18,11 @@ package org.apache.flink.runtime.scheduler.adaptive; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.testutils.CompletedScheduledFuture; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.failure.FailureEnricherUtils; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; @@ -36,7 +33,6 @@ import org.apache.flink.util.concurrent.Executors; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -63,79 +59,6 @@ class CreatingExecutionGraphTest { @RegisterExtension MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext(); - @Test - void testCancelTransitionsToFinished() { - final CreatingExecutionGraph creatingExecutionGraph = - new CreatingExecutionGraph( - context, - new CompletableFuture<>(), - LOG, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - context.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); - }); - creatingExecutionGraph.cancel(); - } - - @Test - void testSuspendTransitionsToFinished() { - final CreatingExecutionGraph creatingExecutionGraph = - new CreatingExecutionGraph( - context, - new CompletableFuture<>(), - LOG, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - FlinkException expectedException = new FlinkException("This is a test exception"); - - context.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); - assertThat( - archivedExecutionGraph - .getFailureInfo() - .getException() - .deserializeError(this.getClass().getClassLoader())) - .isEqualTo(expectedException); - }); - - creatingExecutionGraph.suspend(expectedException); - } - - @Test - void testGlobalFailureTransitionsToFinished() { - final CreatingExecutionGraph creatingExecutionGraph = - new CreatingExecutionGraph( - context, - new CompletableFuture<>(), - LOG, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, - null); - - RuntimeException expectedException = new RuntimeException("This is a test exception"); - - context.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); - assertThat( - archivedExecutionGraph - .getFailureInfo() - .getException() - .deserializeError(this.getClass().getClassLoader())) - .isEqualTo(expectedException); - }); - - creatingExecutionGraph.handleGlobalFailure( - expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); - } - @Test void testFailedExecutionGraphCreationTransitionsToFinished() { final CompletableFuture @@ -235,10 +158,8 @@ private static OperatorCoordinatorHandler createTestingOperatorCoordinatorHandle return new TestingOperatorCoordinatorHandler(); } - static class MockCreatingExecutionGraphContext - implements CreatingExecutionGraph.Context, AfterEachCallback { - private final StateValidator finishedStateValidator = - new StateValidator<>("Finished"); + static class MockCreatingExecutionGraphContext extends MockStateWithoutExecutionGraphContext + implements CreatingExecutionGraph.Context { private final StateValidator waitingForResourcesStateValidator = new StateValidator<>("WaitingForResources"); private final StateValidator executingStateValidator = @@ -255,12 +176,6 @@ static class MockCreatingExecutionGraphContext // No-op. }; - private boolean hasStateTransition = false; - - public void setExpectFinished(Consumer asserter) { - finishedStateValidator.expectInput(asserter); - } - public void setExpectWaitingForResources() { waitingForResourcesStateValidator.expectInput((none) -> {}); } @@ -281,12 +196,6 @@ public void setGlobalFailureHandler(GlobalFailureHandler globalFailureHandler) { this.globalFailureHandler = globalFailureHandler; } - @Override - public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { - finishedStateValidator.validateInput(archivedExecutionGraph); - registerStateTransition(); - } - @Override public void goToExecuting( ExecutionGraph executionGraph, @@ -297,13 +206,6 @@ public void goToExecuting( registerStateTransition(); } - @Override - public ArchivedExecutionGraph getArchivedExecutionGraph( - JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - new JobID(), "testJob", jobStatus, cause, null, 0L); - } - @Override public ScheduledFuture runIfState(State expectedState, Runnable action, Duration delay) { if (!hasStateTransition()) { @@ -328,7 +230,7 @@ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( @Override public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { waitingForResourcesStateValidator.validateInput(null); - hasStateTransition = true; + registerStateTransition(); } @Override @@ -348,18 +250,10 @@ public JobManagerJobMetricGroup getMetricGroup() { @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - finishedStateValidator.close(); + super.afterEach(extensionContext); waitingForResourcesStateValidator.close(); executingStateValidator.close(); } - - public boolean hasStateTransition() { - return hasStateTransition; - } - - public void registerStateTransition() { - hasStateTransition = true; - } } private static CreatingExecutionGraph.ExecutionGraphWithVertexParallelism getGraph( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java new file mode 100644 index 0000000000000..75dbab9a804d9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; + +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.util.function.Consumer; + +/** Mock the {@link StateWithoutExecutionGraph.Context}. */ +class MockStateWithoutExecutionGraphContext + implements StateWithoutExecutionGraph.Context, AfterEachCallback { + + private final StateValidator finishedStateValidator = + new StateValidator<>("Finished"); + + private boolean hasStateTransition = false; + + public void setExpectFinished(Consumer asserter) { + finishedStateValidator.expectInput(asserter); + } + + @Override + public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { + finishedStateValidator.validateInput(archivedExecutionGraph); + registerStateTransition(); + } + + @Override + public ArchivedExecutionGraph getArchivedExecutionGraph( + JobStatus jobStatus, @Nullable Throwable cause) { + return new ArchivedExecutionGraphBuilder() + .setState(jobStatus) + .setFailureCause(cause == null ? null : new ErrorInfo(cause, 1337)) + .build(); + } + + @Override + public void afterEach(ExtensionContext extensionContext) throws Exception { + finishedStateValidator.close(); + } + + public boolean hasStateTransition() { + return hasStateTransition; + } + + public void registerStateTransition() { + hasStateTransition = true; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java new file mode 100644 index 0000000000000..349371180a0f6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.failure.FailureEnricherUtils; +import org.apache.flink.util.FlinkException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link StateWithoutExecutionGraph} state. */ +public class StateWithoutExecutionGraphTest { + + private static final Logger LOG = LoggerFactory.getLogger(CreatedTest.class); + + @RegisterExtension + MockStateWithoutExecutionGraphContext ctx = new MockStateWithoutExecutionGraphContext(); + + @Test + void testCancelTransitionsToFinished() { + TestingStateWithoutExecutionGraph state = new TestingStateWithoutExecutionGraph(ctx, LOG); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); + }); + state.cancel(); + } + + @Test + void testSuspendTransitionsToFinished() { + FlinkException expectedException = new FlinkException("This is a test exception"); + TestingStateWithoutExecutionGraph state = new TestingStateWithoutExecutionGraph(ctx, LOG); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + state.suspend(expectedException); + } + + @Test + void testTransitionToFinishedOnGlobalFailure() { + TestingStateWithoutExecutionGraph state = new TestingStateWithoutExecutionGraph(ctx, LOG); + RuntimeException expectedException = new RuntimeException("This is a test exception"); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat( + archivedExecutionGraph + .getFailureInfo() + .getException() + .deserializeError(this.getClass().getClassLoader())) + .isEqualTo(expectedException); + }); + + state.handleGlobalFailure(expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); + } + + private static final class TestingStateWithoutExecutionGraph + extends StateWithoutExecutionGraph { + + TestingStateWithoutExecutionGraph(Context context, Logger logger) { + super(context, logger); + } + + @Override + public JobStatus getJobStatus() { + return null; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index bdcc73f5c9d45..9c7aab6901b54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -18,19 +18,12 @@ package org.apache.flink.runtime.scheduler.adaptive; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.testutils.ScheduledTask; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.failure.FailureEnricherUtils; -import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.ManualClock; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -215,65 +208,6 @@ void testStateTransitionOnResourceTimeout() { ctx.runScheduledTasks(); } - @Test - void testTransitionToFinishedOnGlobalFailure() { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); - RuntimeException expectedException = new RuntimeException("This is a test exception"); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); - assertThat( - archivedExecutionGraph - .getFailureInfo() - .getException() - .deserializeError(this.getClass().getClassLoader())) - .isEqualTo(expectedException); - }); - - wfr.handleGlobalFailure(expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS); - } - - @Test - void testCancel() { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNull(); - }); - wfr.cancel(); - } - - @Test - void testSuspend() { - ctx.setHasDesiredResources(() -> false); - WaitingForResources wfr = - new WaitingForResources(ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT); - - FlinkException expectedException = new FlinkException("This is a test exception"); - - ctx.setExpectFinished( - archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED); - assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); - assertThat( - archivedExecutionGraph - .getFailureInfo() - .getException() - .deserializeError(this.getClass().getClassLoader())) - .isEqualTo(expectedException); - }); - - wfr.suspend(expectedException); - } - @Test void testInternalRunScheduledTasks_correctExecutionOrder() { AtomicBoolean firstRun = new AtomicBoolean(false); @@ -352,14 +286,13 @@ void testInternalRunScheduledTasks_scheduleTaskFromRunnable() { assertThat(executed).isTrue(); } - private static class MockContext implements WaitingForResources.Context, AfterEachCallback { + private static class MockContext extends MockStateWithoutExecutionGraphContext + implements WaitingForResources.Context { private static final Logger LOG = LoggerFactory.getLogger(MockContext.class); private final StateValidator creatingExecutionGraphStateValidator = new StateValidator<>("executing"); - private final StateValidator finishedStateValidator = - new StateValidator<>("finished"); private Supplier hasDesiredResourcesSupplier = () -> false; private Supplier hasSufficientResourcesSupplier = () -> false; @@ -367,7 +300,6 @@ private static class MockContext implements WaitingForResources.Context, AfterEa private final Queue> scheduledTasks = new PriorityQueue<>( Comparator.comparingLong(o -> o.getDelay(TimeUnit.MILLISECONDS))); - private boolean hasStateTransition = false; private final ManualTestTime testTime = new ManualTestTime( @@ -382,10 +314,6 @@ public void setHasSufficientResources(Supplier sup) { hasSufficientResourcesSupplier = sup; } - void setExpectFinished(Consumer asserter) { - finishedStateValidator.expectInput(asserter); - } - void setExpectCreatingExecutionGraph() { creatingExecutionGraphStateValidator.expectInput(none -> {}); } @@ -414,17 +342,8 @@ void runScheduledTasks() { @Override public void afterEach(ExtensionContext extensionContext) throws Exception { + super.afterEach(extensionContext); creatingExecutionGraphStateValidator.close(); - finishedStateValidator.close(); - } - - @Override - public ArchivedExecutionGraph getArchivedExecutionGraph( - JobStatus jobStatus, @Nullable Throwable cause) { - return new ArchivedExecutionGraphBuilder() - .setState(jobStatus) - .setFailureCause(cause == null ? null : new ErrorInfo(cause, 1337)) - .build(); } @Override @@ -459,26 +378,12 @@ public ScheduledFuture runIfState(State expectedState, Runnable action, Durat return scheduledTask; } - @Override - public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { - finishedStateValidator.validateInput(archivedExecutionGraph); - registerStateTransition(); - } - @Override public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { creatingExecutionGraphStateValidator.validateInput(null); registerStateTransition(); } - public boolean hasStateTransition() { - return hasStateTransition; - } - - public void registerStateTransition() { - hasStateTransition = true; - } - public Clock getClock() { return testTime.getClock(); }