Skip to content

Commit

Permalink
[FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernet…
Browse files Browse the repository at this point in the history
…es Application Mode (apache#24303)
  • Loading branch information
ferenc-csaky authored Apr 20, 2024
1 parent 7c4dec6 commit b3fdb07
Show file tree
Hide file tree
Showing 18 changed files with 728 additions and 53 deletions.
8 changes: 5 additions & 3 deletions docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" >
----
----

# Artifact Fetching
# User Artifact Management

Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint.
Flink is capable to upload and fetch local user artifacts in Application Mode. An artifact can be the actual job archive, a UDF that is packaged separately, etc.
1. Uploading local artifacts to a DFS is a Kubernetes specific feature, see the [Kubernetes](#kubernetes) section and look for `kubernetes.artifacts.*` prefixed options.
2. Fetching remote artifacts on the deployed application cluster is supported from DFS or an HTTP(S) endpoint.
{{< hint info >}}
**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode.
**Note:** Artifact Fetching is supported in Standalone Application Mode and Native Kubernetes Application Mode.
{{< /hint >}}

{{< generated/artifact_fetch_configuration >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ For production use, we recommend deploying Flink Applications in the [Applicatio

The [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode) requires that the user code is bundled together with the Flink image because it runs the user code's `main()` method on the cluster.
The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application.
Bundling can be done by modifying the base Flink Docker image, or via the User Artifact Management, which makes it possible to upload and download artifacts that are not available locally.

#### Modify the Docker image

The Flink community provides a [base Docker image]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#docker-hub-flink-images) which can be used to bundle the user code:

Expand All @@ -97,13 +100,44 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command:

```bash
# Local Schema
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image.ref=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
```

#### Configure User Artifact Management

In case you have a locally available Flink job JAR, artifact upload can be used so Flink will upload the local artifact to DFS during deployment and fetch it on the deployed JobManager pod:

```bash
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
-Dkubernetes.artifacts.local-upload-enabled=true \
-Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
local:///tmp/my-flink-job.jar
```

The `kubernetes.artifacts.local-upload-enabled` enables this feature, and `kubernetes.artifacts.local-upload-target` has to point to a valid remote target that exists and has the permissions configured properly.
You can add additional artifacts via the `user.artifacts.artifact-list` config option, which can contain a mix of local and remote artifacts:

```bash
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
-Dkubernetes.artifacts.local-upload-enabled=true \
-Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
-Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar \
local:///tmp/my-flink-job.jar
```

In case the job JAR or any additional artifact is already available remotely via DFS or HTTP(S), Flink will simply fetch it on the deployed JobManager pod:

```bash
# FileSystem
$ ./bin/flink run-application \
--target kubernetes-application \
Expand All @@ -118,14 +152,17 @@ $ ./bin/flink run-application \
-Dkubernetes.container.image=custom-image-name \
https://ip:port/my-flink-job.jar
```

{{< hint warning >}}
Please be aware that already existing artifacts will not be overwritten during a local upload!
{{< /hint >}}

{{< hint info >}}
JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.
The JAR will be downloaded to
[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
{{< /hint >}}

<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template).

The `kubernetes.cluster-id` option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.

Expand Down
8 changes: 5 additions & 3 deletions docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" >
----
----

# Artifact Fetching
# User Artifact Management

Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint.
Flink is capable to upload and fetch local user artifacts in Application Mode. An artifact can be the actual job archive, a UDF that is packaged separately, etc.
1. Uploading local artifacts to a DFS is a Kubernetes specific feature, see the [Kubernetes](#kubernetes) section and look for `kubernetes.artifacts.*` prefixed options.
2. Fetching remote artifacts on the deployed application cluster is supported from DFS or an HTTP(S) endpoint.
{{< hint info >}}
**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode.
**Note:** Artifact Fetching is supported in Standalone Application Mode and Native Kubernetes Application Mode.
{{< /hint >}}

{{< generated/artifact_fetch_configuration >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ For high-level intuition behind the application mode, please refer to the [deplo

The [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode) requires that the user code is bundled together with the Flink image because it runs the user code's `main()` method on the cluster.
The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application.
Bundling can be done by modifying the base Flink Docker image, or via the User Artifact Management, which makes it possible to upload and download artifacts that are not available locally.

#### Modify the Docker image

The Flink community provides a [base Docker image]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#docker-hub-flink-images) which can be used to bundle the user code:

Expand All @@ -105,13 +108,44 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command:

```bash
# Local Schema
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image.ref=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
```

#### Configure User Artifact Management

In case you have a locally available Flink job JAR, artifact upload can be used so Flink will upload the local artifact to DFS during deployment and fetch it on the deployed JobManager pod:

```bash
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
-Dkubernetes.artifacts.local-upload-enabled=true \
-Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
local:///tmp/my-flink-job.jar
```

The `kubernetes.artifacts.local-upload-enabled` enables this feature, and `kubernetes.artifacts.local-upload-target` has to point to a valid remote target that exists and has the permissions configured properly.
You can add additional artifacts via the `user.artifacts.artifact-list` config option, which can contain a mix of local and remote artifacts:

```bash
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
-Dkubernetes.artifacts.local-upload-enabled=true \
-Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
-Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar \
local:///tmp/my-flink-job.jar
```

In case the job JAR or any additional artifact is already available remotely via DFS or HTTP(S), Flink will simply fetch it on the deployed JobManager pod:

```bash
# FileSystem
$ ./bin/flink run-application \
--target kubernetes-application \
Expand All @@ -126,14 +160,17 @@ $ ./bin/flink run-application \
-Dkubernetes.container.image=custom-image-name \
https://ip:port/my-flink-job.jar
```

{{< hint warning >}}
Please be aware that already existing artifacts will not be overwritten during a local upload!
{{< /hint >}}

{{< hint info >}}
JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.
The JAR will be downloaded to
[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
{{< /hint >}}

<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template).

The `kubernetes.cluster-id` option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@
<td>String</td>
<td>If configured, Flink will add "resources.limits.&lt;config-key&gt;" and "resources.requests.&lt;config-key&gt;" to the main container of TaskExecutor and set the value to the value of external-resource.&lt;resource_name&gt;.amount.</td>
</tr>
<tr>
<td><h5>kubernetes.artifacts.local-upload-enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enables uploading 'local://' schemed artifacts to DFS before the the application cluster deployment.</td>
</tr>
<tr>
<td><h5>kubernetes.artifacts.local-upload-overwrite</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If enabled, overwrites any existing artifact on the remote target. Disabled by default.</td>
</tr>
<tr>
<td><h5>kubernetes.artifacts.local-upload-target</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The target remote DFS directory to upload local artifacts.</td>
</tr>
<tr>
<td><h5>kubernetes.client.io-pool.size</h5></td>
<td style="word-wrap: break-word;">4</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private File fetchArtifact(String uri) throws Exception {

private boolean isRawHttp(String uriScheme) {
if ("http".equals(uriScheme)) {
if (conf.getBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED)) {
if (conf.get(ArtifactFetchOptions.RAW_HTTP_ENABLED)) {
return true;
}
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.client.cli.ArtifactFetchOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.testutils.TestingUtils;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
Expand All @@ -37,11 +38,8 @@
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -60,12 +58,12 @@ class ArtifactFetchManagerTest {
@BeforeEach
void setup() {
configuration = new Configuration();
configuration.setString(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString());
configuration.set(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString());
}

@Test
void testGetFetcher() throws Exception {
configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
configuration.set(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
ArtifactFetchManager fetchManager = new ArtifactFetchManager(configuration);

ArtifactFetcher fetcher = fetchManager.getFetcher(new URI("local:///a.jar"));
Expand All @@ -86,7 +84,7 @@ void testGetFetcher() throws Exception {

@Test
void testFileSystemFetchWithoutAdditionalUri() throws Exception {
File sourceFile = getDummyArtifact(getClass());
File sourceFile = TestingUtils.getClassFile(getClass());
String uriStr = "file://" + sourceFile.toURI().getPath();

ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration);
Expand All @@ -98,7 +96,7 @@ void testFileSystemFetchWithoutAdditionalUri() throws Exception {

@Test
void testFileSystemFetchWithAdditionalUri() throws Exception {
File sourceFile = getDummyArtifact(getClass());
File sourceFile = TestingUtils.getClassFile(getClass());
String uriStr = "file://" + sourceFile.toURI().getPath();
File additionalSrcFile = getFlinkClientsJar();
String additionalUriStr = "file://" + additionalSrcFile.toURI().getPath();
Expand All @@ -115,7 +113,7 @@ void testFileSystemFetchWithAdditionalUri() throws Exception {

@Test
void testHttpFetch() throws Exception {
configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
configuration.set(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
HttpServer httpServer = null;
try {
httpServer = startHttpServer();
Expand All @@ -141,7 +139,7 @@ void testHttpFetch() throws Exception {

@Test
void testMixedArtifactFetch() throws Exception {
File sourceFile = getDummyArtifact(getClass());
File sourceFile = TestingUtils.getClassFile(getClass());
String uriStr = "file://" + sourceFile.toURI().getPath();
File sourceFile2 = getFlinkClientsJar();
String uriStr2 = "file://" + sourceFile2.toURI().getPath();
Expand All @@ -161,7 +159,7 @@ void testNoFetchOverride() throws Exception {
new ArtifactFetchManager(
dummyFetcher, dummyFetcher, dummyFetcher, configuration, null);

File sourceFile = getDummyArtifact(getClass());
File sourceFile = TestingUtils.getClassFile(getClass());
Path destFile = tempDir.resolve(sourceFile.getName());
Files.copy(sourceFile.toPath(), destFile);

Expand Down Expand Up @@ -220,34 +218,13 @@ private HttpServer startHttpServer() throws IOException {
return httpServer;
}

private File getDummyArtifact(Class<?> cls) {
String className = String.format("%s.class", cls.getSimpleName());
URL url = cls.getResource(className);
assertThat(url).isNotNull();

return new File(url.getPath());
}

private File getFlinkClientsJar() throws IOException {
String pathStr =
ArtifactFetchManager.class
.getProtectionDomain()
.getCodeSource()
.getLocation()
.getPath();
Path mvnTargetDir = Paths.get(pathStr).getParent();

Collection<Path> jarPaths =
org.apache.flink.util.FileUtils.listFilesInDirectory(
mvnTargetDir,
p ->
org.apache.flink.util.FileUtils.isJarFile(p)
&& p.toFile().getName().startsWith("flink-clients")
&& !p.toFile().getName().contains("test-utils"));

assertThat(jarPaths).isNotEmpty();

return jarPaths.iterator().next().toFile();
return TestingUtils.getFileFromTargetDir(
ArtifactFetchManager.class,
p ->
org.apache.flink.util.FileUtils.isJarFile(p)
&& p.toFile().getName().startsWith("flink-clients")
&& !p.toFile().getName().contains("test-utils"));
}

private static class DummyHttpDownloadHandler implements HttpHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.executor.TestExecutorResource;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;

/** Convenience functions to test actor based components. */
public class TestingUtils {
Expand Down Expand Up @@ -56,4 +65,24 @@ public static TestExecutorResource<ScheduledExecutorService> defaultExecutorReso
public static UUID zeroUUID() {
return ZERO_UUID;
}

public static File getClassFile(Class<?> cls) {
String className = String.format("%s.class", cls.getSimpleName());
URL url = cls.getResource(className);
assertThat(url).isNotNull();

return new File(url.getPath());
}

public static File getFileFromTargetDir(Class<?> cls, Predicate<Path> fileFilter)
throws IOException {
final String pathStr = cls.getProtectionDomain().getCodeSource().getLocation().getPath();
final Path mvnTargetDir = Paths.get(pathStr).getParent();

final Collection<Path> jarPaths =
org.apache.flink.util.FileUtils.listFilesInDirectory(mvnTargetDir, fileFilter);
assertThat(jarPaths).isNotEmpty();

return jarPaths.iterator().next().toFile();
}
}
Loading

0 comments on commit b3fdb07

Please sign in to comment.