diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index 711d97aef40f7..94addd8ccadda 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -402,11 +402,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" >
----
----
-# Artifact Fetching
+# User Artifact Management
-Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint.
+Flink is capable to upload and fetch local user artifacts in Application Mode. An artifact can be the actual job archive, a UDF that is packaged separately, etc.
+1. Uploading local artifacts to a DFS is a Kubernetes specific feature, see the [Kubernetes](#kubernetes) section and look for `kubernetes.artifacts.*` prefixed options.
+2. Fetching remote artifacts on the deployed application cluster is supported from DFS or an HTTP(S) endpoint.
{{< hint info >}}
-**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode.
+**Note:** Artifact Fetching is supported in Standalone Application Mode and Native Kubernetes Application Mode.
{{< /hint >}}
{{< generated/artifact_fetch_configuration >}}
diff --git a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
index d8ca037d44166..0f8f586d09e34 100644
--- a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
+++ b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
@@ -85,6 +85,9 @@ For production use, we recommend deploying Flink Applications in the [Applicatio
The [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode) requires that the user code is bundled together with the Flink image because it runs the user code's `main()` method on the cluster.
The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application.
+Bundling can be done by modifying the base Flink Docker image, or via the User Artifact Management, which makes it possible to upload and download artifacts that are not available locally.
+
+#### Modify the Docker image
The Flink community provides a [base Docker image]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#docker-hub-flink-images) which can be used to bundle the user code:
@@ -97,13 +100,44 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command:
```bash
-# Local Schema
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image.ref=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
+```
+
+#### Configure User Artifact Management
+
+In case you have a locally available Flink job JAR, artifact upload can be used so Flink will upload the local artifact to DFS during deployment and fetch it on the deployed JobManager pod:
+
+```bash
+$ ./bin/flink run-application \
+ --target kubernetes-application \
+ -Dkubernetes.cluster-id=my-first-application-cluster \
+ -Dkubernetes.container.image=custom-image-name \
+ -Dkubernetes.artifacts.local-upload-enabled=true \
+ -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
+ local:///tmp/my-flink-job.jar
+```
+
+The `kubernetes.artifacts.local-upload-enabled` enables this feature, and `kubernetes.artifacts.local-upload-target` has to point to a valid remote target that exists and has the permissions configured properly.
+You can add additional artifacts via the `user.artifacts.artifact-list` config option, which can contain a mix of local and remote artifacts:
+
+```bash
+$ ./bin/flink run-application \
+ --target kubernetes-application \
+ -Dkubernetes.cluster-id=my-first-application-cluster \
+ -Dkubernetes.container.image=custom-image-name \
+ -Dkubernetes.artifacts.local-upload-enabled=true \
+ -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
+ -Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar \
+ local:///tmp/my-flink-job.jar
+```
+
+In case the job JAR or any additional artifact is already available remotely via DFS or HTTP(S), Flink will simply fetch it on the deployed JobManager pod:
+```bash
# FileSystem
$ ./bin/flink run-application \
--target kubernetes-application \
@@ -118,14 +152,17 @@ $ ./bin/flink run-application \
-Dkubernetes.container.image=custom-image-name \
https://ip:port/my-flink-job.jar
```
+
+{{< hint warning >}}
+Please be aware that already existing artifacts will not be overwritten during a local upload!
+{{< /hint >}}
+
{{< hint info >}}
JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.
The JAR will be downloaded to
[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
{{< /hint >}}
-Note `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template).
-
The `kubernetes.cluster-id` option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index cc903eb34f21f..1215d7bd7b2f9 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -404,11 +404,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" >
----
----
-# Artifact Fetching
+# User Artifact Management
-Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint.
+Flink is capable to upload and fetch local user artifacts in Application Mode. An artifact can be the actual job archive, a UDF that is packaged separately, etc.
+1. Uploading local artifacts to a DFS is a Kubernetes specific feature, see the [Kubernetes](#kubernetes) section and look for `kubernetes.artifacts.*` prefixed options.
+2. Fetching remote artifacts on the deployed application cluster is supported from DFS or an HTTP(S) endpoint.
{{< hint info >}}
-**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode.
+**Note:** Artifact Fetching is supported in Standalone Application Mode and Native Kubernetes Application Mode.
{{< /hint >}}
{{< generated/artifact_fetch_configuration >}}
diff --git a/docs/content/docs/deployment/resource-providers/native_kubernetes.md b/docs/content/docs/deployment/resource-providers/native_kubernetes.md
index 6748afd0a9503..c823f787838df 100644
--- a/docs/content/docs/deployment/resource-providers/native_kubernetes.md
+++ b/docs/content/docs/deployment/resource-providers/native_kubernetes.md
@@ -93,6 +93,9 @@ For high-level intuition behind the application mode, please refer to the [deplo
The [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode) requires that the user code is bundled together with the Flink image because it runs the user code's `main()` method on the cluster.
The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application.
+Bundling can be done by modifying the base Flink Docker image, or via the User Artifact Management, which makes it possible to upload and download artifacts that are not available locally.
+
+#### Modify the Docker image
The Flink community provides a [base Docker image]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#docker-hub-flink-images) which can be used to bundle the user code:
@@ -105,13 +108,44 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command:
```bash
-# Local Schema
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image.ref=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
+```
+
+#### Configure User Artifact Management
+
+In case you have a locally available Flink job JAR, artifact upload can be used so Flink will upload the local artifact to DFS during deployment and fetch it on the deployed JobManager pod:
+
+```bash
+$ ./bin/flink run-application \
+ --target kubernetes-application \
+ -Dkubernetes.cluster-id=my-first-application-cluster \
+ -Dkubernetes.container.image=custom-image-name \
+ -Dkubernetes.artifacts.local-upload-enabled=true \
+ -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
+ local:///tmp/my-flink-job.jar
+```
+
+The `kubernetes.artifacts.local-upload-enabled` enables this feature, and `kubernetes.artifacts.local-upload-target` has to point to a valid remote target that exists and has the permissions configured properly.
+You can add additional artifacts via the `user.artifacts.artifact-list` config option, which can contain a mix of local and remote artifacts:
+
+```bash
+$ ./bin/flink run-application \
+ --target kubernetes-application \
+ -Dkubernetes.cluster-id=my-first-application-cluster \
+ -Dkubernetes.container.image=custom-image-name \
+ -Dkubernetes.artifacts.local-upload-enabled=true \
+ -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
+ -Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar \
+ local:///tmp/my-flink-job.jar
+```
+
+In case the job JAR or any additional artifact is already available remotely via DFS or HTTP(S), Flink will simply fetch it on the deployed JobManager pod:
+```bash
# FileSystem
$ ./bin/flink run-application \
--target kubernetes-application \
@@ -126,14 +160,17 @@ $ ./bin/flink run-application \
-Dkubernetes.container.image=custom-image-name \
https://ip:port/my-flink-job.jar
```
+
+{{< hint warning >}}
+Please be aware that already existing artifacts will not be overwritten during a local upload!
+{{< /hint >}}
+
{{< hint info >}}
JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.
The JAR will be downloaded to
[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
{{< /hint >}}
-Note `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template).
-
The `kubernetes.cluster-id` option specifies the cluster name and must be unique.
If you do not specify this option, then Flink will generate a random name.
diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 49c5e96cec7d3..68ea05ca32cc8 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -14,6 +14,24 @@
String |
If configured, Flink will add "resources.limits.<config-key>" and "resources.requests.<config-key>" to the main container of TaskExecutor and set the value to the value of external-resource.<resource_name>.amount. |
+
+ kubernetes.artifacts.local-upload-enabled |
+ false |
+ Boolean |
+ Enables uploading 'local://' schemed artifacts to DFS before the the application cluster deployment. |
+
+
+ kubernetes.artifacts.local-upload-overwrite |
+ false |
+ Boolean |
+ If enabled, overwrites any existing artifact on the remote target. Disabled by default. |
+
+
+ kubernetes.artifacts.local-upload-target |
+ (none) |
+ String |
+ The target remote DFS directory to upload local artifacts. |
+
kubernetes.client.io-pool.size |
4 |
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java
index 20d491433bdcb..42683b9139b96 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java
@@ -159,7 +159,7 @@ private File fetchArtifact(String uri) throws Exception {
private boolean isRawHttp(String uriScheme) {
if ("http".equals(uriScheme)) {
- if (conf.getBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED)) {
+ if (conf.get(ArtifactFetchOptions.RAW_HTTP_ENABLED)) {
return true;
}
throw new IllegalArgumentException(
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java
index f41c8237a315a..b55d38163df21 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java
@@ -19,6 +19,7 @@
import org.apache.flink.client.cli.ArtifactFetchOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.testutils.TestingUtils;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@@ -37,11 +38,8 @@
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collection;
import java.util.Collections;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -60,12 +58,12 @@ class ArtifactFetchManagerTest {
@BeforeEach
void setup() {
configuration = new Configuration();
- configuration.setString(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString());
+ configuration.set(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString());
}
@Test
void testGetFetcher() throws Exception {
- configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
+ configuration.set(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
ArtifactFetchManager fetchManager = new ArtifactFetchManager(configuration);
ArtifactFetcher fetcher = fetchManager.getFetcher(new URI("local:///a.jar"));
@@ -86,7 +84,7 @@ void testGetFetcher() throws Exception {
@Test
void testFileSystemFetchWithoutAdditionalUri() throws Exception {
- File sourceFile = getDummyArtifact(getClass());
+ File sourceFile = TestingUtils.getClassFile(getClass());
String uriStr = "file://" + sourceFile.toURI().getPath();
ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration);
@@ -98,7 +96,7 @@ void testFileSystemFetchWithoutAdditionalUri() throws Exception {
@Test
void testFileSystemFetchWithAdditionalUri() throws Exception {
- File sourceFile = getDummyArtifact(getClass());
+ File sourceFile = TestingUtils.getClassFile(getClass());
String uriStr = "file://" + sourceFile.toURI().getPath();
File additionalSrcFile = getFlinkClientsJar();
String additionalUriStr = "file://" + additionalSrcFile.toURI().getPath();
@@ -115,7 +113,7 @@ void testFileSystemFetchWithAdditionalUri() throws Exception {
@Test
void testHttpFetch() throws Exception {
- configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
+ configuration.set(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
HttpServer httpServer = null;
try {
httpServer = startHttpServer();
@@ -141,7 +139,7 @@ void testHttpFetch() throws Exception {
@Test
void testMixedArtifactFetch() throws Exception {
- File sourceFile = getDummyArtifact(getClass());
+ File sourceFile = TestingUtils.getClassFile(getClass());
String uriStr = "file://" + sourceFile.toURI().getPath();
File sourceFile2 = getFlinkClientsJar();
String uriStr2 = "file://" + sourceFile2.toURI().getPath();
@@ -161,7 +159,7 @@ void testNoFetchOverride() throws Exception {
new ArtifactFetchManager(
dummyFetcher, dummyFetcher, dummyFetcher, configuration, null);
- File sourceFile = getDummyArtifact(getClass());
+ File sourceFile = TestingUtils.getClassFile(getClass());
Path destFile = tempDir.resolve(sourceFile.getName());
Files.copy(sourceFile.toPath(), destFile);
@@ -220,34 +218,13 @@ private HttpServer startHttpServer() throws IOException {
return httpServer;
}
- private File getDummyArtifact(Class> cls) {
- String className = String.format("%s.class", cls.getSimpleName());
- URL url = cls.getResource(className);
- assertThat(url).isNotNull();
-
- return new File(url.getPath());
- }
-
private File getFlinkClientsJar() throws IOException {
- String pathStr =
- ArtifactFetchManager.class
- .getProtectionDomain()
- .getCodeSource()
- .getLocation()
- .getPath();
- Path mvnTargetDir = Paths.get(pathStr).getParent();
-
- Collection jarPaths =
- org.apache.flink.util.FileUtils.listFilesInDirectory(
- mvnTargetDir,
- p ->
- org.apache.flink.util.FileUtils.isJarFile(p)
- && p.toFile().getName().startsWith("flink-clients")
- && !p.toFile().getName().contains("test-utils"));
-
- assertThat(jarPaths).isNotEmpty();
-
- return jarPaths.iterator().next().toFile();
+ return TestingUtils.getFileFromTargetDir(
+ ArtifactFetchManager.class,
+ p ->
+ org.apache.flink.util.FileUtils.isJarFile(p)
+ && p.toFile().getName().startsWith("flink-clients")
+ && !p.toFile().getName().contains("test-utils"));
}
private static class DummyHttpDownloadHandler implements HttpHandler {
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java
index 3c85b9d2c617a..e8e2449b91c17 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java
@@ -22,10 +22,19 @@
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.executor.TestExecutorResource;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** Convenience functions to test actor based components. */
public class TestingUtils {
@@ -56,4 +65,24 @@ public static TestExecutorResource defaultExecutorReso
public static UUID zeroUUID() {
return ZERO_UUID;
}
+
+ public static File getClassFile(Class> cls) {
+ String className = String.format("%s.class", cls.getSimpleName());
+ URL url = cls.getResource(className);
+ assertThat(url).isNotNull();
+
+ return new File(url.getPath());
+ }
+
+ public static File getFileFromTargetDir(Class> cls, Predicate fileFilter)
+ throws IOException {
+ final String pathStr = cls.getProtectionDomain().getCodeSource().getLocation().getPath();
+ final Path mvnTargetDir = Paths.get(pathStr).getParent();
+
+ final Collection jarPaths =
+ org.apache.flink.util.FileUtils.listFilesInDirectory(mvnTargetDir, fileFilter);
+ assertThat(jarPaths).isNotEmpty();
+
+ return jarPaths.iterator().next().toFile();
+ }
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
index efc5f639764ce..025fe1ddded9a 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
@@ -23,6 +23,7 @@
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.kubernetes.artifact.DefaultKubernetesArtifactUploader;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
@@ -56,7 +57,10 @@ public KubernetesClusterDescriptor createClusterDescriptor(Configuration configu
final String clusterId = generateClusterId();
configuration.set(KubernetesConfigOptions.CLUSTER_ID, clusterId);
}
- return new KubernetesClusterDescriptor(configuration, FlinkKubeClientFactory.getInstance());
+ return new KubernetesClusterDescriptor(
+ configuration,
+ FlinkKubeClientFactory.getInstance(),
+ new DefaultKubernetesArtifactUploader());
}
@Nullable
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index 45104e726dee2..b81520b55e5ab 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -33,6 +33,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.artifact.KubernetesArtifactUploader;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
@@ -79,12 +80,17 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor {
private final FlinkKubeClient client;
+ private final KubernetesArtifactUploader artifactUploader;
+
private final String clusterId;
public KubernetesClusterDescriptor(
- Configuration flinkConfig, FlinkKubeClientFactory clientFactory) {
+ Configuration flinkConfig,
+ FlinkKubeClientFactory clientFactory,
+ KubernetesArtifactUploader artifactUploader) {
this.flinkConfig = flinkConfig;
this.clientFactory = clientFactory;
+ this.artifactUploader = artifactUploader;
this.client = clientFactory.fromConfiguration(flinkConfig, "client");
this.clusterId =
checkNotNull(
@@ -217,6 +223,12 @@ public ClusterClientProvider deployApplicationCluster(
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
}
+ try {
+ artifactUploader.uploadAll(flinkConfig);
+ } catch (Exception ex) {
+ throw new ClusterDeploymentException(ex);
+ }
+
final ClusterClientProvider clusterClientProvider =
deployClusterInternal(
KubernetesApplicationClusterEntrypoint.class.getName(),
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java
new file mode 100644
index 0000000000000..146bf49b7b50e
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Default {@link KubernetesArtifactUploader} implementation. */
+public class DefaultKubernetesArtifactUploader implements KubernetesArtifactUploader {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);
+
+ @Override
+ public void uploadAll(Configuration config) throws Exception {
+ if (!config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)) {
+ LOG.info(
+ "Local artifact uploading is disabled. Set '{}' to enable.",
+ KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
+ return;
+ }
+
+ final String jobUri = upload(config, getJobUri(config));
+ updateConfig(config, PipelineOptions.JARS, Collections.singletonList(jobUri));
+
+ final List additionalUris =
+ config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+ .orElse(Collections.emptyList());
+
+ final List uploadedAdditionalUris =
+ additionalUris.stream()
+ .map(
+ FunctionUtils.uncheckedFunction(
+ artifactUri -> upload(config, artifactUri)))
+ .collect(Collectors.toList());
+
+ updateConfig(config, ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
+ }
+
+ @VisibleForTesting
+ String upload(Configuration config, String artifactUriStr)
+ throws IOException, URISyntaxException {
+ final URI artifactUri = PackagedProgramUtils.resolveURI(artifactUriStr);
+ if (!"local".equals(artifactUri.getScheme())) {
+ return artifactUriStr;
+ }
+
+ final String targetDir = config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetDir),
+ String.format(
+ "Setting '%s' to a valid remote path is required.",
+ KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
+
+ final FileSystem.WriteMode writeMode =
+ config.get(KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE)
+ ? FileSystem.WriteMode.OVERWRITE
+ : FileSystem.WriteMode.NO_OVERWRITE;
+
+ final File src = new File(artifactUri.getPath());
+ final Path target = new Path(targetDir, src.getName());
+ if (target.getFileSystem().exists(target)
+ && writeMode == FileSystem.WriteMode.NO_OVERWRITE) {
+ LOG.info(
+ "Skip uploading artifact '{}', as it already exists."
+ + " To overwrite existing artifacts, please set the '{}' config option.",
+ target,
+ KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE.key());
+ } else {
+ final long start = System.currentTimeMillis();
+ final FileSystem fs = target.getFileSystem();
+ try (FSDataOutputStream os = fs.create(target, writeMode)) {
+ FileUtils.copyFile(src, os);
+ }
+ LOG.debug(
+ "Copied file from {} to {}, cost {} ms",
+ src,
+ target,
+ System.currentTimeMillis() - start);
+ }
+
+ return target.toString();
+ }
+
+ @VisibleForTesting
+ void updateConfig(
+ Configuration config, ConfigOption> configOption, List newValue) {
+ final List originalValue =
+ config.getOptional(configOption).orElse(Collections.emptyList());
+
+ if (hasLocal(originalValue)) {
+ LOG.info(
+ "Updating configuration '{}' after to replace local artifact: '{}'",
+ configOption.key(),
+ newValue);
+ config.set(configOption, newValue);
+ }
+ }
+
+ private String getJobUri(Configuration config) {
+ final List jars = config.get(PipelineOptions.JARS);
+ checkArgument(
+ jars.size() == 1,
+ String.format("The '%s' config must contain one JAR.", PipelineOptions.JARS.key()));
+
+ return config.get(PipelineOptions.JARS).get(0);
+ }
+
+ private boolean hasLocal(List originalUris) {
+ return originalUris.stream().anyMatch(uri -> uri.contains("local:/"));
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/KubernetesArtifactUploader.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/KubernetesArtifactUploader.java
new file mode 100644
index 0000000000000..16580b82a0235
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/KubernetesArtifactUploader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+
+/** Local artifact uploader for Kubernetes programs. */
+@Internal
+public interface KubernetesArtifactUploader {
+
+ /**
+ * Uploads all {@code local://} schemed artifact that is present, according to the given
+ * configuration. Any remote artifact remains as it was passed originally.
+ *
+ * Takes the job JAR from the {@link PipelineOptions#JARS} config and any additional
+ * artifacts from the {@link ArtifactFetchOptions#ARTIFACT_LIST} config. After the upload,
+ * replaces the URIs of any local JAR in these configs to point to the remotely available one.
+ *
+ *
Requires {@link KubernetesConfigOptions#LOCAL_UPLOAD_TARGET} to point to a valid, existing
+ * DFS directory with read and write permissions.
+ *
+ * @param config given Flink configuration
+ * @throws Exception when the upload process fails
+ */
+ void uploadAll(Configuration config) throws Exception;
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index a2e2e7c9b054e..88fb3fcfe3f05 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -537,6 +537,26 @@ public class KubernetesConfigOptions {
+ "Flink. A typical use-case is when one uses Flink Kubernetes "
+ "Operator.");
+ public static final ConfigOption LOCAL_UPLOAD_ENABLED =
+ ConfigOptions.key("kubernetes.artifacts.local-upload-enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Enables uploading 'local://' schemed artifacts to DFS before the the application cluster deployment.");
+
+ public static final ConfigOption LOCAL_UPLOAD_OVERWRITE =
+ ConfigOptions.key("kubernetes.artifacts.local-upload-overwrite")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If enabled, overwrites any existing artifact on the remote target. Disabled by default.");
+
+ public static final ConfigOption LOCAL_UPLOAD_TARGET =
+ ConfigOptions.key("kubernetes.artifacts.local-upload-target")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The target remote DFS directory to upload local artifacts.");
+
/**
* This will only be used to support blocklist mechanism, which is experimental currently, so we
* do not want to expose this option in the documentation.
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
index 03ca1dfb570d8..a08263e988c32 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
@@ -86,7 +86,8 @@ public FlinkKubeClient fromConfiguration(
server.createClient().inNamespace(NAMESPACE),
Executors.newSingleThreadScheduledExecutor());
}
- });
+ },
+ config -> {});
}
@Test
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploaderTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploaderTest.java
new file mode 100644
index 0000000000000..77bddf7937d7b
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DefaultKubernetesArtifactUploaderTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.testutils.TestingUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DefaultKubernetesArtifactUploader}. */
+class DefaultKubernetesArtifactUploaderTest {
+
+ private final DefaultKubernetesArtifactUploader artifactUploader =
+ new DefaultKubernetesArtifactUploader();
+
+ @TempDir private Path tmpDir;
+
+ private Configuration config;
+
+ private DummyFs dummyFs;
+
+ @BeforeEach
+ void setup() throws IOException {
+ config = new Configuration();
+ config.set(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED, true);
+ config.set(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET, getTargetDirUri());
+
+ dummyFs = (DummyFs) new org.apache.flink.core.fs.Path(getTargetDirUri()).getFileSystem();
+ dummyFs.resetCallCounters();
+ }
+
+ @Test
+ void testInvalidJobJar() {
+ String msg = "The 'pipeline.jars' config must contain one JAR.";
+
+ config.set(PipelineOptions.JARS, Collections.emptyList());
+ assertThatThrownBy(() -> artifactUploader.uploadAll(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(msg);
+
+ config.set(PipelineOptions.JARS, Arrays.asList("a", "b"));
+ assertThatThrownBy(() -> artifactUploader.uploadAll(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(msg);
+ }
+
+ @Test
+ void testUploadAllWithOneJobJar() throws Exception {
+ File jar = getFlinkKubernetesJar();
+ String localUri = "local://" + jar.getAbsolutePath();
+
+ config.set(PipelineOptions.JARS, Collections.singletonList(localUri));
+ artifactUploader.uploadAll(config);
+
+ assertJobJarUri(jar.getName());
+ }
+
+ @Test
+ void testUploadAllWithAdditionalArtifacts() throws Exception {
+ File jobJar = getFlinkKubernetesJar();
+ File addArtifact1 = TestingUtils.getClassFile(DefaultKubernetesArtifactUploader.class);
+ File addArtifact2 = TestingUtils.getClassFile(KubernetesUtils.class);
+ String localJobUri = "local://" + jobJar.getAbsolutePath();
+ String localAddArtUri = "local://" + addArtifact1.getAbsolutePath();
+ String nonLocalAddArtUri = "dummyfs://" + addArtifact2.getAbsolutePath();
+
+ config.set(PipelineOptions.JARS, Collections.singletonList(localJobUri));
+ config.set(
+ ArtifactFetchOptions.ARTIFACT_LIST,
+ Arrays.asList(nonLocalAddArtUri, localAddArtUri));
+ artifactUploader.uploadAll(config);
+
+ assertJobJarUri(jobJar.getName());
+
+ List additionalArtifactsResult = config.get(ArtifactFetchOptions.ARTIFACT_LIST);
+ assertThat(additionalArtifactsResult).hasSize(2);
+ assertThat(additionalArtifactsResult)
+ .containsExactlyInAnyOrder(
+ nonLocalAddArtUri, "dummyfs:" + tmpDir.resolve(addArtifact1.getName()));
+ }
+
+ @Test
+ void testMissingTargetConf() {
+ config.removeConfig(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+
+ assertThatThrownBy(() -> artifactUploader.upload(config, "local:///tmp/my-artifact.jar"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Setting 'kubernetes.artifacts.local-upload-target' to a valid remote path is required.");
+ }
+
+ @Test
+ void testRemoteUri() throws Exception {
+ config.removeConfig(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
+ String remoteUri = "s3://my-bucket/my-artifact.jar";
+
+ String finalUri = artifactUploader.upload(config, remoteUri);
+
+ assertThat(finalUri).isEqualTo(remoteUri);
+ }
+
+ @Test
+ void testUpload() throws Exception {
+ File jar = getFlinkKubernetesJar();
+ String localUri = "local://" + jar.getAbsolutePath();
+
+ String expectedUri = "dummyfs:" + tmpDir.resolve(jar.getName());
+ String resultUri = artifactUploader.upload(config, localUri);
+
+ assertThat(resultUri).isEqualTo(expectedUri);
+ }
+
+ @Test
+ void testUploadNoOverwrite() throws Exception {
+ File jar = getFlinkKubernetesJar();
+ String localUri = "local://" + jar.getAbsolutePath();
+ Files.createFile(tmpDir.resolve(jar.getName()));
+
+ artifactUploader.upload(config, localUri);
+
+ assertThat(dummyFs.getExistsCallCounter()).isOne();
+ assertThat(dummyFs.getCreateCallCounter()).isZero();
+ }
+
+ @Test
+ void testUploadOverwrite() throws Exception {
+ File jar = getFlinkKubernetesJar();
+ String localUri = "local://" + jar.getAbsolutePath();
+ Files.createFile(tmpDir.resolve(jar.getName()));
+
+ config.set(KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE, true);
+ artifactUploader.upload(config, localUri);
+
+ assertThat(dummyFs.getExistsCallCounter()).isEqualTo(2);
+ assertThat(dummyFs.getCreateCallCounter()).isOne();
+ }
+
+ @Test
+ void testUpdateConfig() {
+ List artifactList =
+ Arrays.asList("local:///tmp/artifact1.jar", "s3://my-bucket/artifact2.jar");
+ Configuration config = new Configuration();
+ config.set(ArtifactFetchOptions.ARTIFACT_LIST, artifactList);
+
+ List uploadedArtifactList = new ArrayList<>(artifactList);
+ uploadedArtifactList.set(0, getTargetDirUri() + "/artifact1.jar");
+ artifactUploader.updateConfig(
+ config, ArtifactFetchOptions.ARTIFACT_LIST, uploadedArtifactList);
+
+ assertThat(config.get(ArtifactFetchOptions.ARTIFACT_LIST)).isEqualTo(uploadedArtifactList);
+ }
+
+ @Test
+ void testNoUpdateConfig() {
+ List artifactList = Collections.singletonList("s3://my-bucket/my-artifact.jar");
+ Configuration config = new Configuration();
+ config.set(ArtifactFetchOptions.ARTIFACT_LIST, artifactList);
+
+ artifactUploader.updateConfig(config, ArtifactFetchOptions.ARTIFACT_LIST, artifactList);
+
+ assertThat(config.get(ArtifactFetchOptions.ARTIFACT_LIST)).isEqualTo(artifactList);
+ }
+
+ private String getTargetDirUri() {
+ return "dummyfs://" + tmpDir;
+ }
+
+ private File getFlinkKubernetesJar() throws IOException {
+ return TestingUtils.getFileFromTargetDir(
+ DefaultKubernetesArtifactUploader.class,
+ p ->
+ org.apache.flink.util.FileUtils.isJarFile(p)
+ && p.toFile().getName().startsWith("flink-kubernetes"));
+ }
+
+ private void assertJobJarUri(String filename) {
+ String expectedUri = "dummyfs:" + tmpDir.resolve(filename);
+
+ List result = config.get(PipelineOptions.JARS);
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0)).isEqualTo(expectedUri);
+ }
+}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DummyFs.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DummyFs.java
new file mode 100644
index 0000000000000..cd80607587874
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DummyFs.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Dummy filesystem to test local artifact upload. */
+public class DummyFs extends LocalFileSystem {
+
+ static final URI FS_URI = URI.create("dummyfs:///");
+
+ private int existsCallCounter;
+
+ private int createCallCounter;
+
+ @Override
+ public URI getUri() {
+ return FS_URI;
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ ++existsCallCounter;
+ return super.exists(f);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path filePath, WriteMode overwrite) throws IOException {
+ ++createCallCounter;
+ return super.create(filePath, overwrite);
+ }
+
+ public void resetCallCounters() {
+ createCallCounter = 0;
+ existsCallCounter = 0;
+ }
+
+ public int getExistsCallCounter() {
+ return existsCallCounter;
+ }
+
+ public int getCreateCallCounter() {
+ return createCallCounter;
+ }
+}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DummyFsFactory.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DummyFsFactory.java
new file mode 100644
index 0000000000000..f2af0dd3592f4
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/artifact/DummyFsFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.artifact;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Dummy filesystem factory to test local artifact upload. */
+public class DummyFsFactory implements FileSystemFactory {
+
+ @Override
+ public String getScheme() {
+ return DummyFs.FS_URI.getScheme();
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ return new DummyFs();
+ }
+}
diff --git a/flink-kubernetes/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-kubernetes/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000000000..d4a677f649efe
--- /dev/null
+++ b/flink-kubernetes/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.artifact.DummyFsFactory