Skip to content

Commit

Permalink
[FLINK-32859][scheduler] Introduce the StateWithoutExecutionGraph
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui authored and XComp committed Aug 23, 2023
1 parent 4dd0912 commit 4aebc43
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 435 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,77 +19,32 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/** Initial state of the {@link AdaptiveScheduler}. */
class Created implements State {
class Created extends StateWithoutExecutionGraph {

private final Context context;

private final Logger logger;

Created(Context context, Logger logger) {
super(context, logger);
this.context = context;
this.logger = logger;
}

@Override
public void cancel() {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null));
}

@Override
public void suspend(Throwable cause) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause));
}

@Override
public JobStatus getJobStatus() {
return JobStatus.INITIALIZING;
}

@Override
public ArchivedExecutionGraph getJob() {
return context.getArchivedExecutionGraph(getJobStatus(), null);
}

@Override
public void handleGlobalFailure(
Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause));
}

@Override
public Logger getLogger() {
return logger;
}

/** Starts the scheduling by going into the {@link WaitingForResources} state. */
void startScheduling() {
context.goToWaitingForResources(null);
}

/** Context of the {@link Created} state. */
interface Context extends StateTransitions.ToFinished, StateTransitions.ToWaitingForResources {

/**
* Creates an {@link ArchivedExecutionGraph} for the given jobStatus and failure cause.
*
* @param jobStatus jobStatus to create the {@link ArchivedExecutionGraph} with
* @param cause cause represents the failure cause for the {@link ArchivedExecutionGraph};
* {@code null} if there is no failure cause
* @return the created {@link ArchivedExecutionGraph}
*/
ArchivedExecutionGraph getArchivedExecutionGraph(
JobStatus jobStatus, @Nullable Throwable cause);
}
interface Context
extends StateWithoutExecutionGraph.Context, StateTransitions.ToWaitingForResources {}

static class Factory implements StateFactory<Created> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
Expand All @@ -43,7 +42,6 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -56,10 +54,9 @@
* If there are enough slots for the {@link ExecutionGraph} to run, the state transitions to {@link
* Executing}.
*/
public class CreatingExecutionGraph implements State {
public class CreatingExecutionGraph extends StateWithoutExecutionGraph {

private final Context context;
private final Logger logger;
private final OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory;

private final @Nullable ExecutionGraph previousExecutionGraph;
Expand All @@ -71,8 +68,8 @@ public CreatingExecutionGraph(
Logger logger,
OperatorCoordinatorHandlerFactory operatorCoordinatorFactory,
ExecutionGraph previousExecutionGraph1) {
super(context, logger);
this.context = context;
this.logger = logger;
this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory;

FutureUtils.assertNoException(
Expand All @@ -93,11 +90,12 @@ private void handleExecutionGraphCreation(
@Nullable ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism,
@Nullable Throwable throwable) {
if (throwable != null) {
logger.info(
"Failed to go from {} to {} because the ExecutionGraph creation failed.",
CreatingExecutionGraph.class.getSimpleName(),
Executing.class.getSimpleName(),
throwable);
getLogger()
.info(
"Failed to go from {} to {} because the ExecutionGraph creation failed.",
CreatingExecutionGraph.class.getSimpleName(),
Executing.class.getSimpleName(),
throwable);
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable));
} else {
for (ExecutionVertex vertex :
Expand All @@ -109,8 +107,9 @@ private void handleExecutionGraphCreation(
context.tryToAssignSlots(executionGraphWithVertexParallelism);

if (result.isSuccess()) {
logger.debug(
"Successfully reserved and assigned the required slots for the ExecutionGraph.");
getLogger()
.debug(
"Successfully reserved and assigned the required slots for the ExecutionGraph.");
final ExecutionGraph executionGraph = result.getExecutionGraph();
final ExecutionGraphHandler executionGraphHandler =
new ExecutionGraphHandler(
Expand Down Expand Up @@ -144,62 +143,26 @@ private void handleExecutionGraphCreation(
operatorCoordinatorHandler,
Collections.emptyList());
} else {
logger.debug(
"Failed to reserve and assign the required slots. Waiting for new resources.");
getLogger()
.debug(
"Failed to reserve and assign the required slots. Waiting for new resources.");
context.goToWaitingForResources(previousExecutionGraph);
}
}
}

@Override
public void cancel() {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null));
}

@Override
public void suspend(Throwable cause) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause));
}

@Override
public JobStatus getJobStatus() {
return JobStatus.CREATED;
}

@Override
public ArchivedExecutionGraph getJob() {
return context.getArchivedExecutionGraph(getJobStatus(), null);
}

@Override
public void handleGlobalFailure(
Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause));
}

@Override
public Logger getLogger() {
return logger;
}

/** Context for the {@link CreatingExecutionGraph} state. */
interface Context
extends GlobalFailureHandler,
extends StateWithoutExecutionGraph.Context,
GlobalFailureHandler,
StateTransitions.ToExecuting,
StateTransitions.ToFinished,
StateTransitions.ToWaitingForResources {

/**
* Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can
* be null if there is no failure.
*
* @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with
* @param cause cause describing a failure cause; {@code null} if there is none
* @return the created {@link ArchivedExecutionGraph}
*/
ArchivedExecutionGraph getArchivedExecutionGraph(
JobStatus jobStatus, @Nullable Throwable cause);

/**
* Runs the given action after a delay if the state at this time equals the expected state.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Abstract state class which contains its {@link Context} and {@link #logger} to execute common
* operations.
*/
abstract class StateWithoutExecutionGraph implements State {

private final Context context;

private final Logger logger;

StateWithoutExecutionGraph(Context context, Logger logger) {
this.context = context;
this.logger = logger;
}

@Override
public void cancel() {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null));
}

@Override
public void suspend(Throwable cause) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause));
}

@Override
public ArchivedExecutionGraph getJob() {
return context.getArchivedExecutionGraph(getJobStatus(), null);
}

@Override
public void handleGlobalFailure(
Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause));
}

@Override
public Logger getLogger() {
return logger;
}

/** Context of the {@link StateWithoutExecutionGraph} state. */
interface Context extends StateTransitions.ToFinished {

/**
* Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can
* be null if there is no failure.
*
* @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with
* @param cause cause describing a failure cause; {@code null} if there is none
* @return the created {@link ArchivedExecutionGraph}
*/
ArchivedExecutionGraph getArchivedExecutionGraph(
JobStatus jobStatus, @Nullable Throwable cause);
}
}
Loading

0 comments on commit 4aebc43

Please sign in to comment.