diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index fa86da9ae9669..3de080b57e3c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -308,21 +308,22 @@ private[spark] class SparkSubmit extends Logging {
args.ivySettingsPath)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
- // In K8s client mode, when in the driver, add resolved jars early as we might need
- // them at the submit time for artifact downloading.
- // For example we might use the dependencies for downloading
- // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
- // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
- if (isKubernetesClusterModeDriver) {
- val loader = getSubmitClassLoader(sparkConf)
- for (jar <- resolvedMavenCoordinates.split(",")) {
- addJarToClasspath(jar, loader)
- }
- } else if (isKubernetesCluster) {
+ if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates.split(",")
} else {
+ // In K8s client mode, when in the driver, add resolved jars early as we might need
+ // them at the submit time for artifact downloading.
+ // For example we might use the dependencies for downloading
+ // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
+ // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
+ if (isKubernetesClusterModeDriver) {
+ val loader = getSubmitClassLoader(sparkConf)
+ for (jar <- resolvedMavenCoordinates.split(",")) {
+ addJarToClasspath(jar, loader)
+ }
+ }
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
index f4a8556c71f6e..fa85a3b9b2c54 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
@@ -104,7 +104,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
override def typeName: String = "matrix"
- override def pyUDT: String = "pyspark.ml.linalg.MatrixUDT"
+ override def pyUDT: String = "pyspark3.ml.linalg.MatrixUDT"
private[spark] override def asNullable: MatrixUDT = this
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
index 35bbaf5aa1ded..e63542b390129 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
@@ -69,7 +69,7 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
}
}
- override def pyUDT: String = "pyspark.ml.linalg.VectorUDT"
+ override def pyUDT: String = "pyspark3.ml.linalg.VectorUDT"
override def userClass: Class[Vector] = classOf[Vector]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala b/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala
index da62f8518e363..8bed821f61fc7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala
@@ -31,7 +31,7 @@ import org.apache.spark.mllib.api.python.SerDeBase
*/
private[spark] object MLSerDe extends SerDeBase with Serializable {
- override val PYSPARK_PACKAGE = "pyspark.ml"
+ override val PYSPARK_PACKAGE = "pyspark3.ml"
// Pickler for DenseVector
private[python] class DenseVectorPickler extends BasePickler[DenseVector] {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 57edc965112ef..92a2b46e1eff1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -256,7 +256,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
override def typeName: String = "matrix"
- override def pyUDT: String = "pyspark.mllib.linalg.MatrixUDT"
+ override def pyUDT: String = "pyspark3.mllib.linalg.MatrixUDT"
private[spark] override def asNullable: MatrixUDT = this
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 9ed9dd0c88c9b..d6f714615d613 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -313,7 +313,7 @@ class VectorUDT extends UserDefinedType[Vector] {
}
}
- override def pyUDT: String = "pyspark.mllib.linalg.VectorUDT"
+ override def pyUDT: String = "pyspark3.mllib.linalg.VectorUDT"
override def userClass: Class[Vector] = classOf[Vector]
diff --git a/python/pyspark3/ml/linalg/__init__.py b/python/pyspark3/ml/linalg/__init__.py
index c6570ca4c346e..ed0db9ad31f2a 100644
--- a/python/pyspark3/ml/linalg/__init__.py
+++ b/python/pyspark3/ml/linalg/__init__.py
@@ -134,7 +134,7 @@ def sqlType(cls):
@classmethod
def module(cls):
- return "pyspark.ml.linalg"
+ return "pyspark3.ml.linalg"
@classmethod
def scalaUDT(cls):
@@ -184,7 +184,7 @@ def sqlType(cls):
@classmethod
def module(cls):
- return "pyspark.ml.linalg"
+ return "pyspark3.ml.linalg"
@classmethod
def scalaUDT(cls):
diff --git a/python/pyspark3/ml/util.py b/python/pyspark3/ml/util.py
index a55ba0843624b..053289e0ea14e 100644
--- a/python/pyspark3/ml/util.py
+++ b/python/pyspark3/ml/util.py
@@ -297,7 +297,7 @@ def _java_loader_class(cls, clazz):
implementation replaces "pyspark" by "org.apache.spark" in
the Python full class name.
"""
- java_package = clazz.__module__.replace("pyspark", "org.apache.spark")
+ java_package = clazz.__module__.replace("pyspark3", "org.apache.spark")
if clazz.__name__ in ("Pipeline", "PipelineModel"):
# Remove the last package name "pipeline" for Pipeline and PipelineModel.
java_package = ".".join(java_package.split(".")[0:-1])
@@ -573,7 +573,7 @@ def getAndSetParams(instance, metadata, skipParams=None):
@staticmethod
def isPythonParamsInstance(metadata):
- return metadata['class'].startswith('pyspark.ml.')
+ return metadata['class'].startswith('pyspark3.ml.') or metadata['class'].startswith('pyspark.ml')
@staticmethod
def loadParamsInstance(path, sc):
@@ -584,8 +584,10 @@ def loadParamsInstance(path, sc):
metadata = DefaultParamsReader.loadMetadata(path, sc)
if DefaultParamsReader.isPythonParamsInstance(metadata):
pythonClassName = metadata['class']
+ if pythonClassName.split('.')[0] == 'pyspark':
+ pythonClassName = pythonClassName.replace("pyspark", "pyspark3")
else:
- pythonClassName = metadata['class'].replace("org.apache.spark", "pyspark")
+ pythonClassName = metadata['class'].replace("org.apache.spark", "pyspark3")
py_type = DefaultParamsReader.__get_class(pythonClassName)
instance = py_type.load(path)
return instance
diff --git a/python/pyspark3/ml/wrapper.py b/python/pyspark3/ml/wrapper.py
index 2e2013c59a49c..76eeb84c2c8e3 100644
--- a/python/pyspark3/ml/wrapper.py
+++ b/python/pyspark3/ml/wrapper.py
@@ -244,7 +244,7 @@ def __get_class(clazz):
for comp in parts[1:]:
m = getattr(m, comp)
return m
- stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark")
+ stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark3")
# Generate a default new instance from the stage_name class.
py_type = __get_class(stage_name)
if issubclass(py_type, JavaParams):
diff --git a/python/pyspark3/mllib/linalg/__init__.py b/python/pyspark3/mllib/linalg/__init__.py
index 772cdaf4ba98e..af77c45cb998d 100644
--- a/python/pyspark3/mllib/linalg/__init__.py
+++ b/python/pyspark3/mllib/linalg/__init__.py
@@ -137,7 +137,7 @@ def sqlType(cls):
@classmethod
def module(cls):
- return "pyspark.mllib.linalg"
+ return "pyspark3.mllib.linalg"
@classmethod
def scalaUDT(cls):
@@ -187,7 +187,7 @@ def sqlType(cls):
@classmethod
def module(cls):
- return "pyspark.mllib.linalg"
+ return "pyspark3.mllib.linalg"
@classmethod
def scalaUDT(cls):
diff --git a/python/pyspark3/mllib/util.py b/python/pyspark3/mllib/util.py
index bce6d085af03e..a560557220af3 100644
--- a/python/pyspark3/mllib/util.py
+++ b/python/pyspark3/mllib/util.py
@@ -511,7 +511,7 @@ def _java_loader_class(cls):
implementation replaces "pyspark" by "org.apache.spark" in
the Python full class name.
"""
- java_package = cls.__module__.replace("pyspark", "org.apache.spark")
+ java_package = cls.__module__.replace("pyspark3", "org.apache.spark")
return ".".join([java_package, cls.__name__])
@classmethod
diff --git a/python/pyspark3/version.py b/python/pyspark3/version.py
index 8a4b6fbe42965..28cdba7ad5ad3 100644
--- a/python/pyspark3/version.py
+++ b/python/pyspark3/version.py
@@ -16,4 +16,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-__version__ = "3.1.2+affirm4"
+__version__ = "3.1.2+affirm9"
diff --git a/python/setup.py b/python/setup.py
index 6b0441d4a035c..2f31b70c3c043 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -76,6 +76,7 @@
USER_SCRIPTS_PATH = os.path.join(SPARK_HOME, "sbin")
DATA_PATH = os.path.join(SPARK_HOME, "data")
LICENSES_PATH = os.path.join(SPARK_HOME, "licenses")
+DOCKER_PATH = os.path.join(SPARK_HOME, "resource-managers/kubernetes/docker/src/main/dockerfiles/spark")
SCRIPTS_TARGET = os.path.join(TEMP_PATH, "bin")
USER_SCRIPTS_TARGET = os.path.join(TEMP_PATH, "sbin")
@@ -83,6 +84,7 @@
EXAMPLES_TARGET = os.path.join(TEMP_PATH, "examples")
DATA_TARGET = os.path.join(TEMP_PATH, "data")
LICENSES_TARGET = os.path.join(TEMP_PATH, "licenses")
+DOCKER_TARGET = os.path.join(TEMP_PATH, "k8s")
# Check and see if we are under the spark path in which case we need to build the symlink farm.
# This is important because we only want to build the symlink farm while under Spark otherwise we
@@ -167,6 +169,7 @@ def run(self):
os.symlink(EXAMPLES_PATH, EXAMPLES_TARGET)
os.symlink(DATA_PATH, DATA_TARGET)
os.symlink(LICENSES_PATH, LICENSES_TARGET)
+ os.symlink(DOCKER_PATH, DOCKER_TARGET)
else:
# For windows fall back to the slower copytree
copytree(JARS_PATH, JARS_TARGET)
@@ -175,6 +178,7 @@ def run(self):
copytree(EXAMPLES_PATH, EXAMPLES_TARGET)
copytree(DATA_PATH, DATA_TARGET)
copytree(LICENSES_PATH, LICENSES_TARGET)
+ copytree(DOCKER_PATH, DOCKER_TARGET)
else:
# If we are not inside of SPARK_HOME verify we have the required symlink farm
if not os.path.exists(JARS_TARGET):
@@ -224,12 +228,14 @@ def run(self):
'pyspark3.data',
'pyspark3.licenses',
'pyspark3.resource',
+ 'pyspark3.k8s',
'pyspark3.examples.src.main.python'],
include_package_data=True,
package_dir={
'pyspark3.jars': 'deps/jars',
'pyspark3.bin': 'deps/bin',
'pyspark3.sbin': 'deps/sbin',
+ 'pyspark3.k8s': 'deps/k8s',
'pyspark3.python.lib': 'lib',
'pyspark3.data': 'deps/data',
'pyspark3.licenses': 'deps/licenses',
@@ -238,6 +244,7 @@ def run(self):
package_data={
'pyspark3.jars': ['*.jar'],
'pyspark3.bin': ['*'],
+ 'pyspark3.k8s': ['*'],
'pyspark3.sbin': ['spark-config.sh', 'spark-daemon.sh',
'start-history-server.sh',
'stop-history-server.sh', ],
@@ -281,6 +288,7 @@ def run(self):
if _supports_symlinks():
os.remove(os.path.join(TEMP_PATH, "jars"))
os.remove(os.path.join(TEMP_PATH, "bin"))
+ os.remove(os.path.join(TEMP_PATH, "k8s"))
os.remove(os.path.join(TEMP_PATH, "sbin"))
os.remove(os.path.join(TEMP_PATH, "examples"))
os.remove(os.path.join(TEMP_PATH, "data"))
@@ -288,6 +296,7 @@ def run(self):
else:
rmtree(os.path.join(TEMP_PATH, "jars"))
rmtree(os.path.join(TEMP_PATH, "bin"))
+ rmtree(os.path.join(TEMP_PATH, "k8s"))
rmtree(os.path.join(TEMP_PATH, "sbin"))
rmtree(os.path.join(TEMP_PATH, "examples"))
rmtree(os.path.join(TEMP_PATH, "data"))
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 5e997b217644a..e9c2bec89ee55 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -79,6 +79,22 @@
+
+ io.kubernetes
+ client-java
+ 8.0.0
+
+
+ com.fasterxml.jackson.core
+ *
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+
+
+
+
com.fasterxml.jackson.dataformat
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 459259f77796c..19ed7439c74df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -17,12 +17,17 @@
package org.apache.spark.deploy.k8s
import java.io.File
+import java.io.FileReader
+import java.util.Locale
import com.google.common.base.Charsets
import com.google.common.io.Files
-import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, OAuthTokenProvider}
+import io.fabric8.kubernetes.client.Config.KUBERNETES_KUBECONFIG_FILE
import io.fabric8.kubernetes.client.Config.autoConfigure
-import io.fabric8.kubernetes.client.utils.HttpClientUtils
+import io.fabric8.kubernetes.client.utils.{HttpClientUtils, Utils}
+import io.kubernetes.client.util.FilePersister
+import io.kubernetes.client.util.KubeConfig
import okhttp3.Dispatcher
import org.apache.spark.SparkConf
@@ -31,6 +36,16 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils
+private[spark] class SparkOAuthTokenProvider(config: File) extends OAuthTokenProvider {
+ val kubeConfig = KubeConfig.loadKubeConfig(new FileReader(config))
+ val persister = new FilePersister(config)
+ kubeConfig.setPersistConfig(persister)
+
+ def getToken(): String = {
+ return kubeConfig.getAccessToken()
+ }
+}
+
/**
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
@@ -38,6 +53,47 @@ import org.apache.spark.util.ThreadUtils
*/
private[spark] object SparkKubernetesClientFactory extends Logging {
+ /**
+ * Check if the code is being run from within kubernetes.
+ * @return
+ */
+ def isOnKubernetes(): Boolean = {
+ val serviceHost = System.getenv("KUBERNETES_SERVICE_HOST")
+ return serviceHost != null && serviceHost.length > 0
+ }
+
+ def getHomeDir(): String = {
+ val osName = System.getProperty("os.name").toLowerCase(Locale.ROOT)
+ if (osName.startsWith("win")) {
+ val homeDrive = System.getenv("HOMEDRIVE")
+ val homePath = System.getenv("HOMEPATH")
+ if (homeDrive != null && !homeDrive.isEmpty() && homePath != null && !homePath.isEmpty()) {
+ val homeDir = homeDrive + homePath
+ val f = new File(homeDir)
+ if (f.exists() && f.isDirectory()) {
+ return homeDir
+ }
+ }
+ val userProfile = System.getenv("USERPROFILE")
+ if (userProfile != null && !userProfile.isEmpty()) {
+ val f = new File(userProfile)
+ if (f.exists() && f.isDirectory()) {
+ return userProfile
+ }
+ }
+ }
+ val home = System.getenv("HOME")
+ if (home != null && !home.isEmpty()) {
+ val f = new File(home)
+ if (f.exists() && f.isDirectory()) {
+ return home
+ }
+ }
+
+ //Fall back to user.home should never really get here
+ return System.getProperty("user.home", ".")
+ }
+
def createKubernetesClient(
master: String,
namespace: Option[String],
@@ -77,7 +133,23 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
// Start from an auto-configured config with the desired context
// Fabric 8 uses null to indicate that the users current context should be used so if no
// explicit setting pass null
- val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
+ var builder: ConfigBuilder = new ConfigBuilder()
+ if (!isOnKubernetes()){
+ // Get the kubeconfig file
+ var fileName = Utils.getSystemPropertyOrEnvVar(KUBERNETES_KUBECONFIG_FILE, new File(getHomeDir(), ".kube" + File.separator + "config").toString())
+ // if system property/env var contains multiple files take the first one based on the environment
+ // we are running in (eg. : for Linux, ; for Windows)
+ val fileNames = fileName.split(File.pathSeparator)
+ if (fileNames.length > 1) {
+ fileName = fileNames(0)
+ }
+ val kubeConfigFile = new File(fileName)
+
+ builder = new ConfigBuilder(autoConfigure(null))
+ .withOauthTokenProvider(new SparkOAuthTokenProvider(kubeConfigFile))
+ }
+
+ val config = builder.withApiVersion("v1")
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index f722471906bfb..814e2d83930e9 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -16,8 +16,7 @@
# limitations under the License.
#
-# echo commands to the terminal output
-set -ex
+set -e
# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
@@ -96,10 +95,9 @@ case "$1" in
;;
*)
- echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..."
CMD=("$@")
;;
esac
# Execute the container CMD under tini for better hygiene
-exec /usr/bin/tini -s -- "${CMD[@]}"
+exec /sbin/tini -s -- "${CMD[@]}"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 7fe3263630820..e72195922b408 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -213,7 +213,7 @@ object EvaluatePython {
}
}
- private val module = "pyspark.sql.types"
+ private val module = "pyspark3.sql.types"
/**
* Pickler for StructType