diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fa86da9ae9669..3de080b57e3c6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -308,21 +308,22 @@ private[spark] class SparkSubmit extends Logging { args.ivySettingsPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { - // In K8s client mode, when in the driver, add resolved jars early as we might need - // them at the submit time for artifact downloading. - // For example we might use the dependencies for downloading - // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: - // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 - if (isKubernetesClusterModeDriver) { - val loader = getSubmitClassLoader(sparkConf) - for (jar <- resolvedMavenCoordinates.split(",")) { - addJarToClasspath(jar, loader) - } - } else if (isKubernetesCluster) { + if (isKubernetesCluster) { // We need this in K8s cluster mode so that we can upload local deps // via the k8s application, like in cluster mode driver childClasspath ++= resolvedMavenCoordinates.split(",") } else { + // In K8s client mode, when in the driver, add resolved jars early as we might need + // them at the submit time for artifact downloading. + // For example we might use the dependencies for downloading + // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: + // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 + if (isKubernetesClusterModeDriver) { + val loader = getSubmitClassLoader(sparkConf) + for (jar <- resolvedMavenCoordinates.split(",")) { + addJarToClasspath(jar, loader) + } + } args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala index f4a8556c71f6e..fa85a3b9b2c54 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala @@ -104,7 +104,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] { override def typeName: String = "matrix" - override def pyUDT: String = "pyspark.ml.linalg.MatrixUDT" + override def pyUDT: String = "pyspark3.ml.linalg.MatrixUDT" private[spark] override def asNullable: MatrixUDT = this } diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala index 35bbaf5aa1ded..e63542b390129 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala @@ -69,7 +69,7 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { } } - override def pyUDT: String = "pyspark.ml.linalg.VectorUDT" + override def pyUDT: String = "pyspark3.ml.linalg.VectorUDT" override def userClass: Class[Vector] = classOf[Vector] diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala b/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala index da62f8518e363..8bed821f61fc7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala @@ -31,7 +31,7 @@ import org.apache.spark.mllib.api.python.SerDeBase */ private[spark] object MLSerDe extends SerDeBase with Serializable { - override val PYSPARK_PACKAGE = "pyspark.ml" + override val PYSPARK_PACKAGE = "pyspark3.ml" // Pickler for DenseVector private[python] class DenseVectorPickler extends BasePickler[DenseVector] { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 57edc965112ef..92a2b46e1eff1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -256,7 +256,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] { override def typeName: String = "matrix" - override def pyUDT: String = "pyspark.mllib.linalg.MatrixUDT" + override def pyUDT: String = "pyspark3.mllib.linalg.MatrixUDT" private[spark] override def asNullable: MatrixUDT = this } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 9ed9dd0c88c9b..d6f714615d613 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -313,7 +313,7 @@ class VectorUDT extends UserDefinedType[Vector] { } } - override def pyUDT: String = "pyspark.mllib.linalg.VectorUDT" + override def pyUDT: String = "pyspark3.mllib.linalg.VectorUDT" override def userClass: Class[Vector] = classOf[Vector] diff --git a/python/pyspark3/ml/linalg/__init__.py b/python/pyspark3/ml/linalg/__init__.py index c6570ca4c346e..ed0db9ad31f2a 100644 --- a/python/pyspark3/ml/linalg/__init__.py +++ b/python/pyspark3/ml/linalg/__init__.py @@ -134,7 +134,7 @@ def sqlType(cls): @classmethod def module(cls): - return "pyspark.ml.linalg" + return "pyspark3.ml.linalg" @classmethod def scalaUDT(cls): @@ -184,7 +184,7 @@ def sqlType(cls): @classmethod def module(cls): - return "pyspark.ml.linalg" + return "pyspark3.ml.linalg" @classmethod def scalaUDT(cls): diff --git a/python/pyspark3/ml/util.py b/python/pyspark3/ml/util.py index a55ba0843624b..053289e0ea14e 100644 --- a/python/pyspark3/ml/util.py +++ b/python/pyspark3/ml/util.py @@ -297,7 +297,7 @@ def _java_loader_class(cls, clazz): implementation replaces "pyspark" by "org.apache.spark" in the Python full class name. """ - java_package = clazz.__module__.replace("pyspark", "org.apache.spark") + java_package = clazz.__module__.replace("pyspark3", "org.apache.spark") if clazz.__name__ in ("Pipeline", "PipelineModel"): # Remove the last package name "pipeline" for Pipeline and PipelineModel. java_package = ".".join(java_package.split(".")[0:-1]) @@ -573,7 +573,7 @@ def getAndSetParams(instance, metadata, skipParams=None): @staticmethod def isPythonParamsInstance(metadata): - return metadata['class'].startswith('pyspark.ml.') + return metadata['class'].startswith('pyspark3.ml.') or metadata['class'].startswith('pyspark.ml') @staticmethod def loadParamsInstance(path, sc): @@ -584,8 +584,10 @@ def loadParamsInstance(path, sc): metadata = DefaultParamsReader.loadMetadata(path, sc) if DefaultParamsReader.isPythonParamsInstance(metadata): pythonClassName = metadata['class'] + if pythonClassName.split('.')[0] == 'pyspark': + pythonClassName = pythonClassName.replace("pyspark", "pyspark3") else: - pythonClassName = metadata['class'].replace("org.apache.spark", "pyspark") + pythonClassName = metadata['class'].replace("org.apache.spark", "pyspark3") py_type = DefaultParamsReader.__get_class(pythonClassName) instance = py_type.load(path) return instance diff --git a/python/pyspark3/ml/wrapper.py b/python/pyspark3/ml/wrapper.py index 2e2013c59a49c..76eeb84c2c8e3 100644 --- a/python/pyspark3/ml/wrapper.py +++ b/python/pyspark3/ml/wrapper.py @@ -244,7 +244,7 @@ def __get_class(clazz): for comp in parts[1:]: m = getattr(m, comp) return m - stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark") + stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark3") # Generate a default new instance from the stage_name class. py_type = __get_class(stage_name) if issubclass(py_type, JavaParams): diff --git a/python/pyspark3/mllib/linalg/__init__.py b/python/pyspark3/mllib/linalg/__init__.py index 772cdaf4ba98e..af77c45cb998d 100644 --- a/python/pyspark3/mllib/linalg/__init__.py +++ b/python/pyspark3/mllib/linalg/__init__.py @@ -137,7 +137,7 @@ def sqlType(cls): @classmethod def module(cls): - return "pyspark.mllib.linalg" + return "pyspark3.mllib.linalg" @classmethod def scalaUDT(cls): @@ -187,7 +187,7 @@ def sqlType(cls): @classmethod def module(cls): - return "pyspark.mllib.linalg" + return "pyspark3.mllib.linalg" @classmethod def scalaUDT(cls): diff --git a/python/pyspark3/mllib/util.py b/python/pyspark3/mllib/util.py index bce6d085af03e..a560557220af3 100644 --- a/python/pyspark3/mllib/util.py +++ b/python/pyspark3/mllib/util.py @@ -511,7 +511,7 @@ def _java_loader_class(cls): implementation replaces "pyspark" by "org.apache.spark" in the Python full class name. """ - java_package = cls.__module__.replace("pyspark", "org.apache.spark") + java_package = cls.__module__.replace("pyspark3", "org.apache.spark") return ".".join([java_package, cls.__name__]) @classmethod diff --git a/python/pyspark3/version.py b/python/pyspark3/version.py index 8a4b6fbe42965..28cdba7ad5ad3 100644 --- a/python/pyspark3/version.py +++ b/python/pyspark3/version.py @@ -16,4 +16,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.1.2+affirm4" +__version__ = "3.1.2+affirm9" diff --git a/python/setup.py b/python/setup.py index 6b0441d4a035c..2f31b70c3c043 100755 --- a/python/setup.py +++ b/python/setup.py @@ -76,6 +76,7 @@ USER_SCRIPTS_PATH = os.path.join(SPARK_HOME, "sbin") DATA_PATH = os.path.join(SPARK_HOME, "data") LICENSES_PATH = os.path.join(SPARK_HOME, "licenses") +DOCKER_PATH = os.path.join(SPARK_HOME, "resource-managers/kubernetes/docker/src/main/dockerfiles/spark") SCRIPTS_TARGET = os.path.join(TEMP_PATH, "bin") USER_SCRIPTS_TARGET = os.path.join(TEMP_PATH, "sbin") @@ -83,6 +84,7 @@ EXAMPLES_TARGET = os.path.join(TEMP_PATH, "examples") DATA_TARGET = os.path.join(TEMP_PATH, "data") LICENSES_TARGET = os.path.join(TEMP_PATH, "licenses") +DOCKER_TARGET = os.path.join(TEMP_PATH, "k8s") # Check and see if we are under the spark path in which case we need to build the symlink farm. # This is important because we only want to build the symlink farm while under Spark otherwise we @@ -167,6 +169,7 @@ def run(self): os.symlink(EXAMPLES_PATH, EXAMPLES_TARGET) os.symlink(DATA_PATH, DATA_TARGET) os.symlink(LICENSES_PATH, LICENSES_TARGET) + os.symlink(DOCKER_PATH, DOCKER_TARGET) else: # For windows fall back to the slower copytree copytree(JARS_PATH, JARS_TARGET) @@ -175,6 +178,7 @@ def run(self): copytree(EXAMPLES_PATH, EXAMPLES_TARGET) copytree(DATA_PATH, DATA_TARGET) copytree(LICENSES_PATH, LICENSES_TARGET) + copytree(DOCKER_PATH, DOCKER_TARGET) else: # If we are not inside of SPARK_HOME verify we have the required symlink farm if not os.path.exists(JARS_TARGET): @@ -224,12 +228,14 @@ def run(self): 'pyspark3.data', 'pyspark3.licenses', 'pyspark3.resource', + 'pyspark3.k8s', 'pyspark3.examples.src.main.python'], include_package_data=True, package_dir={ 'pyspark3.jars': 'deps/jars', 'pyspark3.bin': 'deps/bin', 'pyspark3.sbin': 'deps/sbin', + 'pyspark3.k8s': 'deps/k8s', 'pyspark3.python.lib': 'lib', 'pyspark3.data': 'deps/data', 'pyspark3.licenses': 'deps/licenses', @@ -238,6 +244,7 @@ def run(self): package_data={ 'pyspark3.jars': ['*.jar'], 'pyspark3.bin': ['*'], + 'pyspark3.k8s': ['*'], 'pyspark3.sbin': ['spark-config.sh', 'spark-daemon.sh', 'start-history-server.sh', 'stop-history-server.sh', ], @@ -281,6 +288,7 @@ def run(self): if _supports_symlinks(): os.remove(os.path.join(TEMP_PATH, "jars")) os.remove(os.path.join(TEMP_PATH, "bin")) + os.remove(os.path.join(TEMP_PATH, "k8s")) os.remove(os.path.join(TEMP_PATH, "sbin")) os.remove(os.path.join(TEMP_PATH, "examples")) os.remove(os.path.join(TEMP_PATH, "data")) @@ -288,6 +296,7 @@ def run(self): else: rmtree(os.path.join(TEMP_PATH, "jars")) rmtree(os.path.join(TEMP_PATH, "bin")) + rmtree(os.path.join(TEMP_PATH, "k8s")) rmtree(os.path.join(TEMP_PATH, "sbin")) rmtree(os.path.join(TEMP_PATH, "examples")) rmtree(os.path.join(TEMP_PATH, "data")) diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 5e997b217644a..e9c2bec89ee55 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -79,6 +79,22 @@ + + io.kubernetes + client-java + 8.0.0 + + + com.fasterxml.jackson.core + * + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + com.fasterxml.jackson.dataformat diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 459259f77796c..19ed7439c74df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -17,12 +17,17 @@ package org.apache.spark.deploy.k8s import java.io.File +import java.io.FileReader +import java.util.Locale import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, OAuthTokenProvider} +import io.fabric8.kubernetes.client.Config.KUBERNETES_KUBECONFIG_FILE import io.fabric8.kubernetes.client.Config.autoConfigure -import io.fabric8.kubernetes.client.utils.HttpClientUtils +import io.fabric8.kubernetes.client.utils.{HttpClientUtils, Utils} +import io.kubernetes.client.util.FilePersister +import io.kubernetes.client.util.KubeConfig import okhttp3.Dispatcher import org.apache.spark.SparkConf @@ -31,6 +36,16 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.util.ThreadUtils +private[spark] class SparkOAuthTokenProvider(config: File) extends OAuthTokenProvider { + val kubeConfig = KubeConfig.loadKubeConfig(new FileReader(config)) + val persister = new FilePersister(config) + kubeConfig.setPersistConfig(persister) + + def getToken(): String = { + return kubeConfig.getAccessToken() + } +} + /** * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL @@ -38,6 +53,47 @@ import org.apache.spark.util.ThreadUtils */ private[spark] object SparkKubernetesClientFactory extends Logging { + /** + * Check if the code is being run from within kubernetes. + * @return + */ + def isOnKubernetes(): Boolean = { + val serviceHost = System.getenv("KUBERNETES_SERVICE_HOST") + return serviceHost != null && serviceHost.length > 0 + } + + def getHomeDir(): String = { + val osName = System.getProperty("os.name").toLowerCase(Locale.ROOT) + if (osName.startsWith("win")) { + val homeDrive = System.getenv("HOMEDRIVE") + val homePath = System.getenv("HOMEPATH") + if (homeDrive != null && !homeDrive.isEmpty() && homePath != null && !homePath.isEmpty()) { + val homeDir = homeDrive + homePath + val f = new File(homeDir) + if (f.exists() && f.isDirectory()) { + return homeDir + } + } + val userProfile = System.getenv("USERPROFILE") + if (userProfile != null && !userProfile.isEmpty()) { + val f = new File(userProfile) + if (f.exists() && f.isDirectory()) { + return userProfile + } + } + } + val home = System.getenv("HOME") + if (home != null && !home.isEmpty()) { + val f = new File(home) + if (f.exists() && f.isDirectory()) { + return home + } + } + + //Fall back to user.home should never really get here + return System.getProperty("user.home", ".") + } + def createKubernetesClient( master: String, namespace: Option[String], @@ -77,7 +133,23 @@ private[spark] object SparkKubernetesClientFactory extends Logging { // Start from an auto-configured config with the desired context // Fabric 8 uses null to indicate that the users current context should be used so if no // explicit setting pass null - val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null))) + var builder: ConfigBuilder = new ConfigBuilder() + if (!isOnKubernetes()){ + // Get the kubeconfig file + var fileName = Utils.getSystemPropertyOrEnvVar(KUBERNETES_KUBECONFIG_FILE, new File(getHomeDir(), ".kube" + File.separator + "config").toString()) + // if system property/env var contains multiple files take the first one based on the environment + // we are running in (eg. : for Linux, ; for Windows) + val fileNames = fileName.split(File.pathSeparator) + if (fileNames.length > 1) { + fileName = fileNames(0) + } + val kubeConfigFile = new File(fileName) + + builder = new ConfigBuilder(autoConfigure(null)) + .withOauthTokenProvider(new SparkOAuthTokenProvider(kubeConfigFile)) + } + + val config = builder.withApiVersion("v1") .withApiVersion("v1") .withMasterUrl(master) .withWebsocketPingInterval(0) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index f722471906bfb..814e2d83930e9 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -16,8 +16,7 @@ # limitations under the License. # -# echo commands to the terminal output -set -ex +set -e # Check whether there is a passwd entry for the container UID myuid=$(id -u) @@ -96,10 +95,9 @@ case "$1" in ;; *) - echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..." CMD=("$@") ;; esac # Execute the container CMD under tini for better hygiene -exec /usr/bin/tini -s -- "${CMD[@]}" +exec /sbin/tini -s -- "${CMD[@]}" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 7fe3263630820..e72195922b408 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -213,7 +213,7 @@ object EvaluatePython { } } - private val module = "pyspark.sql.types" + private val module = "pyspark3.sql.types" /** * Pickler for StructType