diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index c6c99b5d7..0bfa83cb8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -16,34 +16,38 @@ package com.nvidia.spark.rapids.tool.profiling +import org.apache.spark.sql.rapids.tool.ToolUtils + case class ApplicationSummaryInfo( - val appInfo: Seq[AppInfoProfileResults], - val dsInfo: Seq[DataSourceProfileResult], - val execInfo: Seq[ExecutorInfoProfileResult], - val jobInfo: Seq[JobInfoProfileResult], - val rapidsProps: Seq[RapidsPropertyProfileResult], - val rapidsJar: Seq[RapidsJarProfileResult], - val sqlMetrics: Seq[SQLAccumProfileResults], - val jsMetAgg: Seq[JobStageAggTaskMetricsProfileResult], - val sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult], - val durAndCpuMet: Seq[SQLDurationExecutorTimeProfileResult], - val skewInfo: Seq[ShuffleSkewProfileResult], - val failedTasks: Seq[FailedTaskProfileResults], - val failedStages: Seq[FailedStagesProfileResults], - val failedJobs: Seq[FailedJobsProfileResults], - val removedBMs: Seq[BlockManagerRemovedProfileResult], - val removedExecutors: Seq[ExecutorsRemovedProfileResult], - val unsupportedOps: Seq[UnsupportedOpsProfileResult], - val sparkProps: Seq[RapidsPropertyProfileResult], - val sqlStageInfo: Seq[SQLStageInfoProfileResult], - val wholeStage: Seq[WholeStageCodeGenResults], - val maxTaskInputBytesRead: Seq[SQLMaxTaskInputSizes], - val appLogPath: Seq[AppLogPathProfileResults], - val ioMetrics: Seq[IOAnalysisProfileResult]) + appInfo: Seq[AppInfoProfileResults], + dsInfo: Seq[DataSourceProfileResult], + execInfo: Seq[ExecutorInfoProfileResult], + jobInfo: Seq[JobInfoProfileResult], + rapidsProps: Seq[RapidsPropertyProfileResult], + rapidsJar: Seq[RapidsJarProfileResult], + sqlMetrics: Seq[SQLAccumProfileResults], + jsMetAgg: Seq[JobStageAggTaskMetricsProfileResult], + sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult], + durAndCpuMet: Seq[SQLDurationExecutorTimeProfileResult], + skewInfo: Seq[ShuffleSkewProfileResult], + failedTasks: Seq[FailedTaskProfileResults], + failedStages: Seq[FailedStagesProfileResults], + failedJobs: Seq[FailedJobsProfileResults], + removedBMs: Seq[BlockManagerRemovedProfileResult], + removedExecutors: Seq[ExecutorsRemovedProfileResult], + unsupportedOps: Seq[UnsupportedOpsProfileResult], + sparkProps: Seq[RapidsPropertyProfileResult], + sqlStageInfo: Seq[SQLStageInfoProfileResult], + wholeStage: Seq[WholeStageCodeGenResults], + maxTaskInputBytesRead: Seq[SQLMaxTaskInputSizes], + appLogPath: Seq[AppLogPathProfileResults], + ioMetrics: Seq[IOAnalysisProfileResult], + sysProps: Seq[RapidsPropertyProfileResult]) trait AppInfoPropertyGetter { def getSparkProperty(propKey: String): Option[String] def getRapidsProperty(propKey: String): Option[String] + def getSystemProperty(propKey: String): Option[String] def getProperty(propKey: String): Option[String] def getSparkVersion: Option[String] def getRapidsJars: Seq[String] @@ -76,7 +80,16 @@ class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter def isAppInfoAvailable = false override def getSparkProperty(propKey: String): Option[String] = None override def getRapidsProperty(propKey: String): Option[String] = None - override def getProperty(propKey: String): Option[String] = None + override def getSystemProperty(propKey: String): Option[String] = None + override def getProperty(propKey: String): Option[String] = { + if (propKey.startsWith(ToolUtils.PROPS_RAPIDS_KEY_PREFIX)) { + getRapidsProperty(propKey) + } else if (propKey.startsWith("spark")){ + getSparkProperty(propKey) + } else { + getSystemProperty(propKey) + } + } override def getSparkVersion: Option[String] = None override def getMaxInput: Double = 0.0 override def getMeanInput: Double = 0.0 @@ -99,7 +112,7 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo) extends AppSummaryInfoBaseProvider { private lazy val distinctLocations = app.dsInfo.groupBy(_.location) - override def isAppInfoAvailable = Option(app).isDefined + override def isAppInfoAvailable: Boolean = Option(app).isDefined private def findPropertyInProfPropertyResults( key: String, @@ -118,12 +131,8 @@ class SingleAppSummaryInfoProvider(val app: ApplicationSummaryInfo) findPropertyInProfPropertyResults(propKey, app.rapidsProps) } - override def getProperty(propKey: String): Option[String] = { - if (propKey.startsWith("spark.rapids")) { - getRapidsProperty(propKey) - } else { - getSparkProperty(propKey) - } + override def getSystemProperty(propKey: String): Option[String] = { + findPropertyInProfPropertyResults(propKey, app.sysProps) } override def getSparkVersion: Option[String] = { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 5465000c3..885d991ab 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -34,7 +34,7 @@ import org.yaml.snakeyaml.representer.Representer import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.{GpuTypes, ToolUtils} -import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil +import org.apache.spark.sql.rapids.tool.util.{StringUtils, WebCrawlerUtil} /** * A wrapper class that stores all the GPU properties. @@ -222,9 +222,9 @@ class RecommendationEntry(val name: String, propValue match { case None => None case Some(value) => - if (AutoTuner.containsMemoryUnits(value)) { + if (StringUtils.isMemorySize(value)) { // if it is memory return the bytes unit - Some(s"${AutoTuner.convertFromHumanReadableSize(value)}") + Some(s"${StringUtils.convertMemorySizeToBytes(value)}") } else { propValue } @@ -466,7 +466,7 @@ class AutoTuner( */ def calcGpuConcTasks(): Long = { Math.min(MAX_CONC_GPU_TASKS, - convertToMB(clusterProps.gpu.memory) / DEF_GPU_MEM_PER_TASK_MB) + StringUtils.convertToMB(clusterProps.gpu.memory) / DEF_GPU_MEM_PER_TASK_MB) } /** @@ -477,7 +477,7 @@ class AutoTuner( private def calcAvailableMemPerExec(): Double = { // account for system overhead val usableWorkerMem = - Math.max(0, convertToMB(clusterProps.system.memory) - DEF_SYSTEM_RESERVE_MB) + Math.max(0, StringUtils.convertToMB(clusterProps.system.memory) - DEF_SYSTEM_RESERVE_MB) // clusterProps.gpu.getCount can never be 0. This is verified in processPropsAndCheck() (1.0 * usableWorkerMem) / clusterProps.gpu.getCount } @@ -608,6 +608,7 @@ class AutoTuner( recommendShufflePartitions() recommendGCProperty() recommendClassPathEntries() + recommendSystemProperties() } def getShuffleManagerClassName() : Option[String] = { @@ -735,22 +736,34 @@ class AutoTuner( } val autoBroadcastJoinThresholdProperty = - getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(convertToMB) + getPropertyValue("spark.sql.adaptive.autoBroadcastJoinThreshold").map(StringUtils.convertToMB) if (autoBroadcastJoinThresholdProperty.isEmpty) { appendComment("'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set.") } else if (autoBroadcastJoinThresholdProperty.get > - convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) { + StringUtils.convertToMB(AQE_AUTOBROADCAST_JOIN_THRESHOLD)) { appendComment("Setting 'spark.sql.adaptive.autoBroadcastJoinThreshold' > " + s"$AQE_AUTOBROADCAST_JOIN_THRESHOLD could lead to performance\n" + " regression. Should be set to a lower number.") } } + /** + * Checks the system properties and give feedback to the user. + * For example file.encoding=UTF-8 is required for some ops like GpuRegEX. + */ + private def recommendSystemProperties(): Unit = { + appInfoProvider.getSystemProperty("file.encoding").collect { + case encoding if !ToolUtils.isFileEncodingRecommended(encoding) => + appendComment(s"file.encoding should be [${ToolUtils.SUPPORTED_ENCODINGS.mkString}]" + + " because GPU only supports the charset when using some expressions.") + } + } + /** * Check the class path entries with the following rules: * 1- If ".*rapids-4-spark.*jar" is missing then add a comment that the latest jar should be * included in the classpath unless it is part of the spark - * 2- If there are more than 1 entry for ".*rapids-4-spark.*jar", then add a comment that the + * 2- If there are more than 1 entry for ".*rapids-4-spark.*jar", then add a comment that * there should be only 1 jar in the class path. * 3- If there are cudf jars, ignore that for now. * 4- If there is a new release recommend that to the user @@ -811,7 +824,7 @@ class AutoTuner( private def calculateMaxPartitionBytes(maxPartitionBytes: String): String = { // AutoTuner only supports a single app right now, so we get whatever value is here val inputBytesMax = appInfoProvider.getMaxInput / 1024 / 1024 - val maxPartitionBytesNum = convertToMB(maxPartitionBytes) + val maxPartitionBytesNum = StringUtils.convertToMB(maxPartitionBytes) if (inputBytesMax == 0.0) { maxPartitionBytesNum.toString } else { @@ -860,7 +873,7 @@ class AutoTuner( if (isCalculationEnabled("spark.sql.files.maxPartitionBytes")) { calculateMaxPartitionBytes(maxPartitionProp) } else { - s"${convertToMB(maxPartitionProp)}" + s"${StringUtils.convertToMB(maxPartitionProp)}" } appendRecommendationForMemoryMB("spark.sql.files.maxPartitionBytes", recommended) } @@ -1221,59 +1234,6 @@ object AutoTuner extends Logging { } } - /** - * Converts size from human readable to bytes. - * Eg, "4m" -> 4194304. - */ - def convertFromHumanReadableSize(size: String): Long = { - val sizesArr = size.toLowerCase.split("(?=[a-z])") - val sizeNum = sizesArr(0).toDouble - if (sizesArr.length > 1) { - val sizeUnit = sizesArr(1) - assert(SUPPORTED_SIZE_UNITS.contains(sizeUnit), s"$size is not a valid human readable size") - (sizeNum * Math.pow(1024, SUPPORTED_SIZE_UNITS.indexOf(sizeUnit))).toLong - } else { - sizeNum.toLong - } - } - - def containsMemoryUnits(size: String): Boolean = { - val sizesArr = size.toLowerCase.split("(?=[a-z])") - if (sizesArr.length > 1) { - SUPPORTED_SIZE_UNITS.contains(sizesArr(1)) - } else { - false - } - } - - def convertToMB(size: String): Long = { - convertFromHumanReadableSize(size) / (1024 * 1024) - } - - /** - * Converts size from bytes to human readable. - * Eg, 4194304 -> "4m", 633554 -> "618.70k". - */ - def convertToHumanReadableSize(size: Long): String = { - if (size < 0L) { - return "0b" - } - - val unitIndex = (Math.log10(size) / Math.log10(1024)).toInt - assert(unitIndex < SUPPORTED_SIZE_UNITS.size, - s"$size is too large to convert to human readable size") - - val sizeNum = size * 1.0/Math.pow(1024, unitIndex) - val sizeUnit = SUPPORTED_SIZE_UNITS(unitIndex) - - // If sizeNum is an integer omit fraction part - if ((sizeNum % 1) == 0) { - f"${sizeNum.toLong}$sizeUnit" - } else { - f"$sizeNum%.2f$sizeUnit" - } - } - /** * Given the spark property "spark.master", it checks whether memoryOverhead should be * enabled/disabled. For Spark Standalone Mode, memoryOverhead property is skipped. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 15b68a46f..55ce1037c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.ToolTextFileWriter import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile -import org.apache.spark.sql.rapids.tool.SQLMetricsStats +import org.apache.spark.sql.rapids.tool.{SQLMetricsStats, ToolUtils} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo case class StageMetrics(numTasks: Int, duration: String) @@ -53,7 +53,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { AppLogPathProfileResults(app.index, a.appName, a.appId, app.eventLogPath) } if (allRows.nonEmpty) { - allRows.sortBy(cols => (cols.appIndex)) + allRows.sortBy(cols => cols.appIndex) } else { Seq.empty } @@ -63,12 +63,17 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { def getRapidsJARInfo: Seq[RapidsJarProfileResult] = { val allRows = apps.flatMap { app => if (app.gpuMode) { - // Look for rapids-4-spark and cuDF jar - val rapidsJar = app.classpathEntries.filterKeys(_ matches ".*rapids-4-spark.*jar") - val cuDFJar = app.classpathEntries.filterKeys(_ matches ".*cudf.*jar") - val cols = (rapidsJar.keys ++ cuDFJar.keys).toSeq - val rowsWithAppindex = cols.map(jar => RapidsJarProfileResult(app.index, jar)) - rowsWithAppindex + // Look for rapids-4-spark and cuDF jar in classPathEntries + val rapidsJars = app.classpathEntries.filterKeys(_ matches ToolUtils.RAPIDS_JAR_REGEX.regex) + if (rapidsJars.nonEmpty) { + val cols = rapidsJars.keys.toSeq + cols.map(jar => RapidsJarProfileResult(app.index, jar)) + } else { + // Look for the rapids-4-spark and cuDF jars in Spark Properties + ToolUtils.extractRAPIDSJarsFromProps(app.sparkProperties).map { + jar => RapidsJarProfileResult(app.index, jar) + } + } } else { Seq.empty } @@ -203,25 +208,30 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { } } - // Print RAPIDS related or all Spark Properties - // This table is inverse of the other tables where the row keys are - // property keys and the columns are the application values. So - // column1 would be all the key values for app index 1. - def getProperties(rapidsOnly: Boolean): Seq[RapidsPropertyProfileResult] = { + /** + * Print RAPIDS related or all Spark Properties when the propSource is set to "rapids". + * Note that RAPIDS related properties are not necessarily starting with prefix 'spark.rapids'. + * This table is inverse of the other tables where the row keys are property keys and the columns + * are the application values. So column1 would be all the key values for app index 1. + * @param propSource defines which type of properties to be retrieved the properties from. + * It can be: rapids, spark, or system + * @return List of properties relevant to the source. + */ + private def getProperties(propSource: String): Seq[RapidsPropertyProfileResult] = { val outputHeaders = ArrayBuffer("propertyName") val props = HashMap[String, ArrayBuffer[String]]() var numApps = 0 apps.foreach { app => numApps += 1 outputHeaders += s"appIndex_${app.index}" - val propsToKeep = if (rapidsOnly) { - app.sparkProperties.filterKeys { key => - key.startsWith("spark.rapids") || key.startsWith("spark.executorEnv.UCX") || - key.startsWith("spark.shuffle.manager") || key.equals("spark.shuffle.service.enabled") - } - } else { + val propsToKeep = if (propSource.equals("rapids")) { + app.sparkProperties.filterKeys { ToolUtils.isRapidsPropKey(_) } + } else if (propSource.equals("spark")) { // remove the rapids related ones - app.sparkProperties.filterKeys(key => !(key.contains("spark.rapids"))) + app.sparkProperties.filterKeys(key => !key.contains(ToolUtils.PROPS_RAPIDS_KEY_PREFIX)) + } else { + // get the system properties + app.systemProperties } CollectInformation.addNewProps(propsToKeep, props, numApps) } @@ -234,6 +244,16 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { } } + def getRapidsProperties: Seq[RapidsPropertyProfileResult] = { + getProperties("rapids") + } + def getSparkProperties: Seq[RapidsPropertyProfileResult] = { + getProperties("spark") + } + def getSystemProperties: Seq[RapidsPropertyProfileResult] = { + getProperties("system") + } + // Print SQL whole stage code gen mapping def getWholeStageCodeGenMapping: Seq[WholeStageCodeGenResults] = { val allWholeStages = apps.flatMap { app => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala index abe055f9e..12fe1bc4e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,13 +33,6 @@ object ProfileUtils { .getOrCreate() } - // Convert a null-able String to Option[Long] - def stringToLong(in: String): Option[Long] = try { - Some(in.toLong) - } catch { - case _: NumberFormatException => None - } - // Convert Option[Long] to String def optionLongToString(in: Option[Long]): String = try { in.get.toString @@ -48,7 +41,7 @@ object ProfileUtils { } // Check if the job/stage is GPU mode is on - def isPluginEnabled(properties: collection.mutable.Map[String, String]): Boolean = { + def isPluginEnabled(properties: collection.Map[String, String]): Boolean = { ToolUtils.isPluginEnabled(properties.toMap) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 4d2f2fea9..5b0f6cc47 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -338,8 +338,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea val execInfo = collect.getExecutorInfo val jobInfo = collect.getJobInfo val sqlStageInfo = collect.getSQLToStage - val rapidsProps = collect.getProperties(rapidsOnly = true) - val sparkProps = collect.getProperties(rapidsOnly = false) + val rapidsProps = collect.getRapidsProperties + val sparkProps = collect.getSparkProperties + val systemProps = collect.getSystemProperties val rapidsJar = collect.getRapidsJARInfo val sqlMetrics = collect.getSQLPlanMetrics val wholeStage = collect.getWholeStageCodeGenMapping @@ -406,7 +407,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea rapidsJar, sqlMetrics, jsMetAgg, sqlTaskAggMetrics, durAndCpuMet, skewInfo, failedTasks, failedStages, failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps, sqlStageInfo, wholeStage, maxTaskInputInfo, - appLogPath, ioAnalysisMetrics), compareRes) + appLogPath, ioAnalysisMetrics, systemProps), compareRes) } /** @@ -437,16 +438,19 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea val sums = if (outputCombined) { // the properties table here has the column names as the app indexes so we have to // handle special - def combineProps(rapidsOnly: Boolean, + + def combineProps(propSource: String, sums: Seq[ApplicationSummaryInfo]): Seq[RapidsPropertyProfileResult] = { var numApps = 0 val props = HashMap[String, ArrayBuffer[String]]() val outputHeaders = ArrayBuffer("propertyName") sums.foreach { app => - val inputProps = if (rapidsOnly) { + val inputProps = if (propSource.equals("rapids")) { app.rapidsProps - } else { + } else if (propSource.equals("spark")) { app.sparkProps + } else { // this is for system properties + app.sysProps } if (inputProps.nonEmpty) { numApps += 1 @@ -467,7 +471,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea appsSum.flatMap(_.dsInfo).sortBy(_.appIndex), appsSum.flatMap(_.execInfo).sortBy(_.appIndex), appsSum.flatMap(_.jobInfo).sortBy(_.appIndex), - combineProps(rapidsOnly=true, appsSum).sortBy(_.key), + combineProps("rapids", appsSum).sortBy(_.key), appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex), appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex), appsSum.flatMap(_.jsMetAgg).sortBy(_.appIndex), @@ -480,12 +484,13 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea appsSum.flatMap(_.removedBMs).sortBy(_.appIndex), appsSum.flatMap(_.removedExecutors).sortBy(_.appIndex), appsSum.flatMap(_.unsupportedOps).sortBy(_.appIndex), - combineProps(rapidsOnly=false, appsSum).sortBy(_.key), + combineProps("spark", appsSum).sortBy(_.key), appsSum.flatMap(_.sqlStageInfo).sortBy(_.duration)(Ordering[Option[Long]].reverse), appsSum.flatMap(_.wholeStage).sortBy(_.appIndex), appsSum.flatMap(_.maxTaskInputBytesRead).sortBy(_.appIndex), appsSum.flatMap(_.appLogPath).sortBy(_.appIndex), - appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex) + appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex), + combineProps("system", appsSum).sortBy(_.key) ) Seq(reduced) } else { @@ -503,6 +508,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea Some("Spark Rapids parameters")) profileOutputWriter.write("Spark Properties", app.sparkProps, Some("Spark Properties")) + profileOutputWriter.write("System Properties", app.sysProps, + Some("System Properties")) profileOutputWriter.write("Rapids Accelerator Jar and cuDF Jar", app.rapidsJar, Some("Rapids 4 Spark Jars")) profileOutputWriter.write("SQL Plan Metrics for Application", app.sqlMetrics, diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index e645bb763..3aafb4597 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -26,44 +26,69 @@ import scala.io.{Codec, Source} import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo} import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser} import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode -import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, DriverAccumCase, JobInfoClass, SQLExecutionInfoClass, StageInfoClass, TaskStageAccumCase} +import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, DriverAccumCase, JobInfoClass, ProfileUtils, SQLExecutionInfoClass, StageInfoClass, TaskStageAccumCase} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter} import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobStart, StageInfo} +import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobStart, SparkListenerLogStart, StageInfo} import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode} import org.apache.spark.sql.rapids.tool.qualification.MLFunctions -import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil +import org.apache.spark.sql.rapids.tool.util.{EventUtils, RapidsToolsConfUtil} import org.apache.spark.util.Utils // Handles updating and caching Spark Properties for a Spark application. -// Properties stored in this container can be accessed to make decision about -// certain analysis that depends on the context of the Spark properties. -// TODO: we need to migrate SparkProperties, GpuMode to this trait. +// Properties stored in this container can be accessed to make decision about certain analysis +// that depends on the context of the Spark properties. trait CacheableProps { + private val RETAINED_SYSTEM_PROPS = Set( + "file.encoding", "java.version", "os.arch", "os.name", + "os.version", "user.timezone") + // caches the spark-version from the eventlogs + var sparkVersion: String = "" + var gpuMode = false // A flag whether hive is enabled or not. Note that we assume that the // property is global to the entire application once it is set. a.k.a, it cannot be disabled // once it is was set to true. var hiveEnabled = false + var sparkProperties = Map[String, String]() + var classpathEntries = Map[String, String]() + // set the fileEncoding to UTF-8 by default + var systemProperties = Map[String, String]() + def handleEnvUpdateForCachedProps(event: SparkListenerEnvironmentUpdate): Unit = { - val sparkProperties = event.environmentDetails("Spark Properties").toMap + sparkProperties ++= event.environmentDetails("Spark Properties").toMap + classpathEntries ++= event.environmentDetails("Classpath Entries").toMap + + gpuMode ||= ProfileUtils.isPluginEnabled(sparkProperties) hiveEnabled ||= HiveParseHelper.isHiveEnabled(sparkProperties) + + // Update the properties if system environments are set. + // No need to capture all the properties in memory. We only capture important ones. + systemProperties ++= event.environmentDetails("System Properties").toMap.filterKeys( + RETAINED_SYSTEM_PROPS.contains(_)) } def handleJobStartForCachedProps(event: SparkListenerJobStart): Unit = { // TODO: we need to improve this in order to support per-job-level hiveEnabled ||= HiveParseHelper.isHiveEnabled(event.properties.asScala) } + + def handleLogStartForCachedProps(event: SparkListenerLogStart): Unit = { + sparkVersion = event.sparkVersion + } + + def isGPUModeEnabledForJob(event: SparkListenerJobStart): Boolean = { + gpuMode || ProfileUtils.isPluginEnabled(event.properties.asScala) + } } abstract class AppBase( val eventLogInfo: Option[EventLogInfo], val hadoopConf: Option[Configuration]) extends Logging with CacheableProps { - var sparkVersion: String = "" var appEndTime: Option[Long] = None // The data source information val dataSourceInfo: ArrayBuffer[DataSourceCase] = ArrayBuffer[DataSourceCase]() @@ -92,8 +117,6 @@ abstract class AppBase( var mlEventLogType = "" var pysparkLogFlag = false - var gpuMode = false - def getOrCreateStage(info: StageInfo): StageInfoClass = { val stage = stageIdToInfo.getOrElseUpdate((info.stageId, info.attemptNumber()), new StageInfoClass(info)) @@ -240,7 +263,7 @@ abstract class AppBase( // Do NOT use a while loop as it is much much slower. lines.find { line => totalNumEvents += 1 - ToolUtils.getEventFromJsonMethod(line) match { + EventUtils.getEventFromJsonMethod(line) match { case Some(e) => processEvent(e) case None => false } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 261875673..622b0b8c8 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,19 +142,15 @@ class AppFilterImpl( val validConfigsMap = keyValueConfigs.map(a => a(0) -> a(1)).toMap val configFilteredResult = userNameLogicFiltered.filter { appFilterReturnParameters => - appFilterReturnParameters.appInfo.sparkProperties.exists { sparkProp => - if (keyValueConfigs.nonEmpty || keysOnlyConfigs.nonEmpty) { - val allConfigs = sparkProp.configName // all configs from eventlog - val allConfigKeys = sparkProp.configName.keys.toList // for keys only confs - // Intersection of configs provided in the filter args with event log configs. - val commonConfigsKeys = validConfigsMap.keySet.intersect(allConfigs.keySet) - - commonConfigsKeys.filter { key => - allConfigs(key) == validConfigsMap(key) - }.nonEmpty || keysOnlyConfigs.intersect(allConfigKeys).nonEmpty - } else { - false - } + if (keyValueConfigs.nonEmpty || keysOnlyConfigs.nonEmpty) { + val appSparkProps = appFilterReturnParameters.appInfo.sparkProperties + val commonConfigsKeys = validConfigsMap.keySet.intersect(appSparkProps.keySet) + val allConfigKeys = appSparkProps.keys.toList // for keys only confs + commonConfigsKeys.filter { key => + appSparkProps(key) == validConfigsMap(key) + }.nonEmpty || keysOnlyConfigs.intersect(allConfigKeys).nonEmpty + } else { + false } } configFilteredResult diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala index 8bad42407..c4e14be3a 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.tool.profiling.{DriverAccumCase, JobInfoClass, Pr import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.ui._ -import org.apache.spark.sql.rapids.tool.util.EventUtils +import org.apache.spark.sql.rapids.tool.util.{EventUtils, StringUtils} abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener with Logging { @@ -121,7 +121,8 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi def doSparkListenerLogStart( app: T, event: SparkListenerLogStart): Unit = { - app.sparkVersion = event.sparkVersion + logDebug("Processing event: " + event.getClass) + app.handleLogStartForCachedProps(event) } def doSparkListenerSQLExecutionStart( @@ -188,9 +189,8 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi doSparkListenerSQLExecutionEnd(app, e) case e: SparkListenerDriverAccumUpdates => doSparkListenerDriverAccumUpdates(app, e) - case SparkListenerLogStart(sparkVersion) => - logInfo("on other event called") - app.sparkVersion = sparkVersion + case e: SparkListenerLogStart => + doSparkListenerLogStart(app, e) case _ => val wasResourceProfileAddedEvent = doSparkListenerResourceProfileAddedReflect(app, event) if (!wasResourceProfileAddedEvent) doOtherEvent(app, event) @@ -223,7 +223,10 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi def doSparkListenerEnvironmentUpdate( app: T, - event: SparkListenerEnvironmentUpdate): Unit = {} + event: SparkListenerEnvironmentUpdate): Unit = { + logDebug("Processing event: " + event.getClass) + app.handleEnvUpdateForCachedProps(event) + } override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { doSparkListenerEnvironmentUpdate(app, environmentUpdate) @@ -306,7 +309,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi logDebug("Processing event: " + event.getClass) app.handleJobStartForCachedProps(event) val sqlIDString = event.properties.getProperty("spark.sql.execution.id") - val sqlID = ProfileUtils.stringToLong(sqlIDString) + val sqlID = StringUtils.stringToLong(sqlIDString) sqlID.foreach(app.jobIdToSqlID(event.jobId) = _) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala index 36dc4081f..988bd3ff3 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/FilterAppInfo.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,8 +26,6 @@ case class ApplicationStartInfo( startTime: Long, userName: String) -case class EnvironmentInfo(configName: Map[String, String]) - class FilterAppInfo( eventLogInfo: EventLogInfo, hadoopConf: Configuration) extends AppBase(Some(eventLogInfo), Some(hadoopConf)) { @@ -45,13 +43,10 @@ class FilterAppInfo( def doSparkListenerEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { logDebug("Processing event: " + event.getClass) - val envSparkProperties = event.environmentDetails("Spark Properties").toMap handleEnvUpdateForCachedProps(event) - sparkProperties = Some(EnvironmentInfo(envSparkProperties)) } var appStartInfo: Option[ApplicationStartInfo] = None - var sparkProperties: Option[EnvironmentInfo] = None // We are currently processing 2 events. This is used as counter to send true when both the // event are processed so that we can stop processing further events. var eventsToProcess: Int = 2 diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index b4198fb9a..0e93ce989 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ package org.apache.spark.sql.rapids.tool -import java.lang.reflect.InvocationTargetException - import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal @@ -31,6 +29,17 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.sql.DataFrame object ToolUtils extends Logging { + // List of recommended file-encodings on the GPUs. + val SUPPORTED_ENCODINGS = Seq("UTF-8") + // the prefix of keys defined by the RAPIDS plugin + val PROPS_RAPIDS_KEY_PREFIX = "spark.rapids" + // List of keys from sparkProperties that may point to RAPIDS jars. + // Note that we ignore "spark.yarn.secondary.jars" for now as it does not include a full path. + val POSSIBLE_JARS_PROPERTIES = Set("spark.driver.extraClassPath", + "spark.executor.extraClassPath", + "spark.yarn.dist.jars", + "spark.repl.local.jars") + val RAPIDS_JAR_REGEX = "(.*rapids-4-spark.*jar)|(.*cudf.*jar)".r // Add more entries to this lookup table as necessary. // There is no need to list all supported versions. @@ -47,71 +56,6 @@ object ToolUtils extends Logging { org.apache.spark.SPARK_VERSION } - lazy val getEventFromJsonMethod: - (String) => Option[org.apache.spark.scheduler.SparkListenerEvent] = { - // Spark 3.4 and Databricks changed the signature on sparkEventFromJson - // Note that it is preferred we use reflection rather than checking Spark-runtime - // because some vendors may back-port features. - val c = Class.forName("org.apache.spark.util.JsonProtocol") - val m = Try { - // versions prior to spark3.4 - c.getDeclaredMethod("sparkEventFromJson", classOf[org.json4s.JValue]) - } match { - case Success(a) => - (line: String) => - a.invoke(null, parse(line)).asInstanceOf[org.apache.spark.scheduler.SparkListenerEvent] - case Failure(_) => - // Spark3.4+ and databricks - val b = c.getDeclaredMethod("sparkEventFromJson", classOf[String]) - (line: String) => - b.invoke(null, line).asInstanceOf[org.apache.spark.scheduler.SparkListenerEvent] - } - // At this point, the method is already defined. - // Note that the Exception handling is moved within the method to make it easier - // to isolate the exception reason. - (line: String) => Try { - m.apply(line) - } match { - case Success(i) => Some(i) - case Failure(e) => - - e match { - case i: InvocationTargetException => - val targetEx = i.getTargetException - if (targetEx != null) { - targetEx match { - case j: com.fasterxml.jackson.core.io.JsonEOFException => - // Spark3.41+ embeds JsonEOFException in the InvocationTargetException - // We need to show a warning message instead of failing the entire app. - logWarning(s"Incomplete eventlog, ${j.getMessage}") - case k: com.fasterxml.jackson.core.JsonParseException => - // this is a parser error thrown by spark-3.4+ which indicates the log is - // malformed - throw k - case z: ClassNotFoundException if z.getMessage != null => - logWarning(s"ClassNotFoundException while parsing an event: ${z.getMessage}") - case t: Throwable => - // We do not want to swallow unknown exceptions so that we can handle later - logError(s"Unknown exception while parsing an event", t) - } - } else { - // Normally it should not happen that invocation target is null. - logError(s"Unknown exception while parsing an event", i) - } - case j: com.fasterxml.jackson.core.io.JsonEOFException => - // Note that JsonEOFException is child of JsonParseException - // In case the eventlog is incomplete (i.e., inprogress), we show a warning message - // because we do not want to cause the entire app to fail. - logWarning(s"Incomplete eventlog, ${j.getMessage}") - case k: com.fasterxml.jackson.core.JsonParseException => - // this is a parser error thrown by version prior to spark-3.4+ which indicates the - // log is malformed - throw k - } - None - } - } - def compareVersions(verA: String, verB: String): Int = { Try { val verObjA = new ComparableVersion(verA) @@ -282,6 +226,39 @@ object ToolUtils extends Logging { values: Seq[String], fileDelimiter: String = QualOutputWriter.CSV_DELIMITER): String = { renderTextField(values, ":", fileDelimiter) } + + /** + * Given a spark property key, this predicates checks if it is related to RAPIDS configurations. + * Note that, "related RAPIDS properties" do not always have 'spark.rapids' prefix. + * + * @param sparkPropKey the spark property key + * @return True if it is directly related to RAPIDS + */ + def isRapidsPropKey(pKey: String): Boolean = { + pKey.startsWith(PROPS_RAPIDS_KEY_PREFIX) || pKey.startsWith("spark.executorEnv.UCX") || + pKey.startsWith("spark.shuffle.manager") || pKey.equals("spark.shuffle.service.enabled") + } + + /** + * Checks if the given value is supported for all Ops or not. + * @param fileEncoding the value being read from the Application configs + * @return True if file encoding is supported + */ + def isFileEncodingRecommended(fileEncoding: String): Boolean = { + fileEncoding.matches("(?i)utf-?8") + } + + /** + * Collects the paths that points to RAPIDS jars in a map of properties. + * @param properties the map of properties to holding the app configuration. + * @return set of unique file paths that matches RAPIDS jars patterns. + */ + def extractRAPIDSJarsFromProps(properties: collection.Map[String, String]): Set[String] = { + properties.filterKeys(POSSIBLE_JARS_PROPERTIES.contains(_)).collect { + case (_, pVal) if pVal.matches(RAPIDS_JAR_REGEX.regex) => + pVal.split(",").filter(_.matches(RAPIDS_JAR_REGEX.regex)) + }.flatten.toSet + } } object JoinType { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index b946cb2c5..f97c3119c 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -199,10 +199,6 @@ class ApplicationInfo( var blockManagersRemoved: ArrayBuffer[BlockManagerRemovedCase] = ArrayBuffer[BlockManagerRemovedCase]() - // From SparkListenerEnvironmentUpdate - var sparkProperties = Map.empty[String, String] - var classpathEntries = Map.empty[String, String] - var appInfo: ApplicationCase = null var appId: String = "" val eventLogPath: String = eLogInfo.eventLog.toString diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala index 6ebdb93cc..d1728d147 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionStart} import org.apache.spark.sql.rapids.tool.EventProcessorBase -import org.apache.spark.sql.rapids.tool.util.EventUtils +import org.apache.spark.sql.rapids.tool.util.{EventUtils, StringUtils} /** * This class is to process all events and do validation in the end. @@ -43,7 +43,7 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati logDebug("Processing event: " + event.getClass) super.doSparkListenerJobStart(app, event) val sqlIDString = event.properties.getProperty("spark.sql.execution.id") - val sqlID = ProfileUtils.stringToLong(sqlIDString) + val sqlID = StringUtils.stringToLong(sqlIDString) // add jobInfoClass val thisJob = new JobInfoClass( event.jobId, @@ -55,7 +55,7 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati None, None, None, - ProfileUtils.isPluginEnabled(event.properties.asScala) || app.gpuMode + app.isGPUModeEnabledForJob(event) ) app.jobIdToInfo.put(event.jobId, thisJob) } @@ -84,11 +84,6 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati } } - override def doSparkListenerLogStart(app: ApplicationInfo, event: SparkListenerLogStart): Unit = { - logDebug("Processing event: " + event.getClass) - app.sparkVersion = event.sparkVersion - } - override def doSparkListenerResourceProfileAdded( app: ApplicationInfo, event: SparkListenerResourceProfileAdded): Unit = { @@ -139,17 +134,9 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati app: ApplicationInfo, event: SparkListenerEnvironmentUpdate): Unit = { logDebug("Processing event: " + event.getClass) - app.handleEnvUpdateForCachedProps(event) - app.sparkProperties = event.environmentDetails("Spark Properties").toMap - app.classpathEntries = event.environmentDetails("Classpath Entries").toMap + super.doSparkListenerEnvironmentUpdate(app, event) - //Decide if this application is on GPU Mode - if (ProfileUtils.isPluginEnabled(collection.mutable.Map() ++= app.sparkProperties)) { - app.gpuMode = true - logDebug("App's GPU Mode = TRUE") - } else { - logDebug("App's GPU Mode = FALSE") - } + logDebug(s"App's GPU Mode = ${app.gpuMode}") } override def doSparkListenerApplicationStart( diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index d1571b292..b4c72140d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_F import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, SupportedMLFuncsName, ToolUtils} @@ -99,6 +99,35 @@ class QualificationAppInfo( super.cleanupSQL(sqlID) } + override def handleEnvUpdateForCachedProps(event: SparkListenerEnvironmentUpdate): Unit = { + super.handleEnvUpdateForCachedProps(event) + if (gpuMode) { + throw GpuEventLogException(s"Cannot parse event logs from GPU run") + } + clusterTags = sparkProperties.getOrElse("spark.databricks.clusterUsageTags.clusterAllTags", "") + clusterTagClusterId = + sparkProperties.getOrElse("spark.databricks.clusterUsageTags.clusterId", "") + clusterTagClusterName = + sparkProperties.getOrElse("spark.databricks.clusterUsageTags.clusterName", "") + } + + override def handleJobStartForCachedProps(event: SparkListenerJobStart): Unit = { + super.handleJobStartForCachedProps(event) + // If the confs are set after SparkSession initialization, it is captured in this event. + if (clusterTags.isEmpty) { + clusterTags = event.properties.getProperty( + "spark.databricks.clusterUsageTags.clusterAllTags", "") + } + if (clusterTagClusterId.isEmpty) { + clusterTagClusterId = event.properties.getProperty( + "spark.databricks.clusterUsageTags.clusterId", "") + } + if (clusterTagClusterName.isEmpty) { + clusterTagClusterName =event.properties.getProperty( + "spark.databricks.clusterUsageTags.clusterName", "") + } + } + // time in ms private def calculateAppDuration(startTime: Long): Option[Long] = { if (startTime > 0) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala index 3752fadac..8ced09a4b 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala @@ -25,30 +25,14 @@ import com.nvidia.spark.rapids.tool.profiling._ import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.ui._ -import org.apache.spark.sql.rapids.tool.{EventProcessorBase, GpuEventLogException, ToolUtils} +import org.apache.spark.sql.rapids.tool.EventProcessorBase +import org.apache.spark.sql.rapids.tool.util.StringUtils class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean) extends EventProcessorBase[QualificationAppInfo](app) { type T = QualificationAppInfo - override def doSparkListenerEnvironmentUpdate( - app: QualificationAppInfo, - event: SparkListenerEnvironmentUpdate): Unit = { - logDebug("Processing event: " + event.getClass) - app.handleEnvUpdateForCachedProps(event) - val sparkProperties = event.environmentDetails("Spark Properties").toMap - if (ToolUtils.isPluginEnabled(sparkProperties)) { - throw GpuEventLogException(s"Cannot parse event logs from GPU run") - } - app.clusterTags = sparkProperties.getOrElse( - "spark.databricks.clusterUsageTags.clusterAllTags", "") - app.clusterTagClusterId = sparkProperties.getOrElse( - "spark.databricks.clusterUsageTags.clusterId", "") - app.clusterTagClusterName = sparkProperties.getOrElse( - "spark.databricks.clusterUsageTags.clusterName", "") - } - override def doSparkListenerApplicationStart( app: QualificationAppInfo, event: SparkListenerApplicationStart): Unit = { @@ -133,12 +117,12 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean logDebug("Processing event: " + event.getClass) super.doSparkListenerJobStart(app, event) val sqlIDString = event.properties.getProperty("spark.sql.execution.id") - ProfileUtils.stringToLong(sqlIDString).foreach { sqlID => + StringUtils.stringToLong(sqlIDString).foreach { sqlID => event.stageIds.foreach { stageId => app.stageIdToSqlID.getOrElseUpdate(stageId, sqlID) } } - val sqlID = ProfileUtils.stringToLong(sqlIDString) + val sqlID = StringUtils.stringToLong(sqlIDString) // don't store if we are only processing per sql queries and the job isn't // related to a SQL query if ((perSqlOnly && sqlID.isDefined) || !perSqlOnly) { @@ -152,23 +136,10 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean None, None, None, - ProfileUtils.isPluginEnabled(event.properties.asScala) || app.gpuMode + app.isGPUModeEnabledForJob(event) ) app.jobIdToInfo.put(event.jobId, thisJob) } - // If the confs are set after SparkSession initialization, it is captured in this event. - if (app.clusterTags.isEmpty) { - app.clusterTags = event.properties.getProperty( - "spark.databricks.clusterUsageTags.clusterAllTags", "") - } - if (app.clusterTagClusterId.isEmpty) { - app.clusterTagClusterId = event.properties.getProperty( - "spark.databricks.clusterUsageTags.clusterId", "") - } - if (app.clusterTagClusterName.isEmpty) { - app.clusterTagClusterName = event.properties.getProperty( - "spark.databricks.clusterUsageTags.clusterName", "") - } } override def doSparkListenerJobEnd( diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala index 4674d363b..6e45ca61d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala @@ -16,9 +16,13 @@ package org.apache.spark.sql.rapids.tool.util +import java.lang.reflect.InvocationTargetException + +import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.profiling.TaskStageAccumCase +import org.json4s.jackson.JsonMethods.parse import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo @@ -36,7 +40,7 @@ object EventUtils extends Logging { * @return valid parsed long of the content or the duration */ @throws[NullPointerException] - def parseAccumFieldToLong(data: Any): Option[Long] = { + private def parseAccumFieldToLong(data: Any): Option[Long] = { val strData = data.toString try { Some(strData.toLong) @@ -88,4 +92,69 @@ object EventUtils extends Logging { defValue: String, targetValue: String): Boolean = { properties.getOrElse(propKey, defValue).equals(targetValue) } + + lazy val getEventFromJsonMethod: + String => Option[org.apache.spark.scheduler.SparkListenerEvent] = { + // Spark 3.4 and Databricks changed the signature on sparkEventFromJson + // Note that it is preferred we use reflection rather than checking Spark-runtime + // because some vendors may back-port features. + val c = Class.forName("org.apache.spark.util.JsonProtocol") + val m = Try { + // versions prior to spark3.4 + c.getDeclaredMethod("sparkEventFromJson", classOf[org.json4s.JValue]) + } match { + case Success(a) => + (line: String) => + a.invoke(null, parse(line)).asInstanceOf[org.apache.spark.scheduler.SparkListenerEvent] + case Failure(_) => + // Spark3.4+ and databricks + val b = c.getDeclaredMethod("sparkEventFromJson", classOf[String]) + (line: String) => + b.invoke(null, line).asInstanceOf[org.apache.spark.scheduler.SparkListenerEvent] + } + // At this point, the method is already defined. + // Note that the Exception handling is moved within the method to make it easier + // to isolate the exception reason. + (line: String) => Try { + m.apply(line) + } match { + case Success(i) => Some(i) + case Failure(e) => + + e match { + case i: InvocationTargetException => + val targetEx = i.getTargetException + if (targetEx != null) { + targetEx match { + case j: com.fasterxml.jackson.core.io.JsonEOFException => + // Spark3.41+ embeds JsonEOFException in the InvocationTargetException + // We need to show a warning message instead of failing the entire app. + logWarning(s"Incomplete eventlog, ${j.getMessage}") + case k: com.fasterxml.jackson.core.JsonParseException => + // this is a parser error thrown by spark-3.4+ which indicates the log is + // malformed + throw k + case z: ClassNotFoundException if z.getMessage != null => + logWarning(s"ClassNotFoundException while parsing an event: ${z.getMessage}") + case t: Throwable => + // We do not want to swallow unknown exceptions so that we can handle later + logError(s"Unknown exception while parsing an event", t) + } + } else { + // Normally it should not happen that invocation target is null. + logError(s"Unknown exception while parsing an event", i) + } + case j: com.fasterxml.jackson.core.io.JsonEOFException => + // Note that JsonEOFException is child of JsonParseException + // In case the eventlog is incomplete (i.e., inprogress), we show a warning message + // because we do not want to cause the entire app to fail. + logWarning(s"Incomplete eventlog, ${j.getMessage}") + case k: com.fasterxml.jackson.core.JsonParseException => + // this is a parser error thrown by version prior to spark-3.4+ which indicates the + // log is malformed + throw k + } + None + } + } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/StringUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/StringUtils.scala index 177282ddc..158c6c68f 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/StringUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/StringUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,15 +18,19 @@ package org.apache.spark.sql.rapids.tool.util import scala.concurrent.duration.{DurationDouble, DurationLong} +import org.apache.spark.internal.Logging + /** * Utility containing the implementation of helpers used for parsing and/or formatting * strings. */ -object StringUtils { +object StringUtils extends Logging { // Regular expression for duration-format 'H+:MM:SS.FFF' // Note: this is not time-of-day. Hours can be larger than 12. private val regExDurationFormat = "^(\\d+):([0-5]\\d):([0-5]\\d\\.\\d+)$" - + private val regExMemorySize = + "^(?i)(\\d+(?:\\.\\d+)?)(b|k(?:ib|b)?|m(?:ib|b)?|g(?:ib|b)?|t(?:ib|b)?|p(?:ib|b)?)$".r + val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p") /** * Checks if the strData is of pattern 'HH:MM:SS.FFF' and return the time as long if applicable. * If the string is not in the expected format, the result is None. @@ -51,4 +55,40 @@ object StringUtils { def reformatCSVString(str: String): String = { "\"" + str.replace("\"", "\"\"") + "\"" } + + // Convert a null-able String to Option[Long] + def stringToLong(in: String): Option[Long] = try { + Some(in.toLong) + } catch { + case _: NumberFormatException => None + } + + def isMemorySize(value: String): Boolean = { + value.matches(regExMemorySize.regex) + } + + /** + * Converts size from human readable to bytes. + * Eg, "4m" -> 4194304. + */ + def convertMemorySizeToBytes(value: String): Long = { + regExMemorySize.findFirstMatchIn(value.toLowerCase) match { + case None => + // try to convert to long + stringToLong(value) match { + case Some(num) => num + case _ => + logError(s"Could not convert memorySize input [$value] to Long") + 0L + } + case Some(m) => + val unitSize = m.group(2).substring(0, 1) + val sizeNum = m.group(1).toDouble + (sizeNum * Math.pow(1024, SUPPORTED_SIZE_UNITS.indexOf(unitSize))).toLong + } + } + + def convertToMB(value: String): Long = { + convertMemorySizeToBytes(value) / (1024 * 1024) + } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index dca5cff9a..df2015956 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -694,7 +694,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { assert(apps.size == 1) val collect = new CollectInformation(apps) for (_ <- apps) { - val rapidsProps = collect.getProperties(rapidsOnly = true) + val rapidsProps = collect.getRapidsProperties val rows = rapidsProps.map(_.rows.head) assert(rows.length == 5) // 5 properties captured. // verify ucx parameters are captured. @@ -703,7 +703,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { //verify gds parameters are captured. assert(rows.contains("spark.rapids.memory.gpu.direct.storage.spill.alignedIO")) - val sparkProps = collect.getProperties(rapidsOnly = false) + val sparkProps = collect.getSparkProperties val sparkPropsRows = sparkProps.map(_.rows.head) assert(sparkPropsRows.contains("spark.eventLog.dir")) assert(sparkPropsRows.contains("spark.plugins")) @@ -766,7 +766,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f => f.endsWith(".csv") }) - assert(dotDirs.length === 17) + assert(dotDirs.length === 18) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -793,7 +793,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f => f.endsWith(".csv") }) - assert(dotDirs.length === 13) + assert(dotDirs.length === 14) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -823,7 +823,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f => f.endsWith(".csv") }) - assert(dotDirs.length === 17) + assert(dotDirs.length === 18) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -853,7 +853,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f => f.endsWith(".csv") }) - assert(dotDirs.length === 15) + assert(dotDirs.length === 16) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index 38d165810..17c6de042 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.rapids.tool.util.WebCrawlerUtil case class DriverInfoProviderMockTest(unsupportedOps: Seq[DriverLogUnsupportedOperators]) extends BaseDriverLogInfoProvider(None) { - override def getUnsupportedOperators = unsupportedOps + override def getUnsupportedOperators: Seq[DriverLogUnsupportedOperators] = unsupportedOps } class AppInfoProviderMockTest(val maxInput: Double, @@ -53,7 +53,9 @@ class AppInfoProviderMockTest(val maxInput: Double, override def getMeanShuffleRead: Double = meanShuffleRead override def getSpilledMetrics: Seq[Long] = spilledMetrics override def getJvmGCFractions: Seq[Double] = jvmGCFractions - override def getProperty(propKey: String): Option[String] = propsFromLog.get(propKey) + override def getRapidsProperty(propKey: String): Option[String] = propsFromLog.get(propKey) + override def getSparkProperty(propKey: String): Option[String] = propsFromLog.get(propKey) + override def getSystemProperty(propKey: String): Option[String] = propsFromLog.get(propKey) override def getSparkVersion: Option[String] = sparkVersion override def getRapidsJars: Seq[String] = rapidsJars override def getDistinctLocationPct: Double = distinctLocationPct @@ -749,6 +751,74 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { assert(expectedResults == autoTunerOutput) } + test("AutoTuner detects non UTF-8 file-encoding") { + // When system properties has an entry for file-encoding that is not supported by GPU for + // certain expressions. Then the AutoTuner should generate a comment warning that the + // file-encoding is not one of the supported ones. + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "8", + "spark.executor.memory" -> "47222m", + "spark.rapids.shuffle.multiThreaded.reader.threads" -> "8", + "spark.rapids.shuffle.multiThreaded.writer.threads" -> "8", + "spark.rapids.sql.concurrentGpuTasks" -> "2", + "spark.rapids.sql.multiThreadedRead.numThreads" -> "20", + "spark.shuffle.manager" -> "com.nvidia.spark.rapids.spark311.RapidsShuffleManager", + "spark.task.resource.gpu.amount" -> "0.0625") + // mock the properties loaded from eventLog + val logEventsProps: mutable.Map[String, String] = + mutable.LinkedHashMap[String, String]( + // set the file-encoding to non UTF-8 + "file.encoding" -> "ISO-8859-1", + "spark.executor.cores" -> "16", + "spark.executor.instances" -> "1", + "spark.executor.memory" -> "80g", + "spark.executor.resource.gpu.amount" -> "1", + "spark.executor.instances" -> "1", + "spark.sql.shuffle.partitions" -> "200", + "spark.sql.files.maxPartitionBytes" -> "1g", + "spark.task.resource.gpu.amount" -> "0.0625", + "spark.rapids.memory.pinnedPool.size" -> "5g", + "spark.rapids.sql.enabled" -> "true", + "spark.plugins" -> "com.nvidia.spark.SQLPlugin", + "spark.rapids.sql.concurrentGpuTasks" -> "4") + val dataprocWorkerInfo = buildWorkerInfoAsString(Some(customProps), Some(32), + Some("212992MiB"), Some(5), Some(4), Some("15109MiB"), Some("Tesla T4")) + val infoProvider = getMockInfoProvider(8126464.0, Seq(0), Seq(0.004), logEventsProps, + Some(defaultSparkVersion)) + val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(dataprocWorkerInfo, infoProvider) + val (properties, comments) = autoTuner.getRecommendedProperties() + val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments) + // scalastyle:off line.size.limit + val expectedResults = + s"""| + |Spark Properties: + |--conf spark.executor.cores=8 + |--conf spark.executor.instances=20 + |--conf spark.executor.memory=16384m + |--conf spark.executor.memoryOverhead=6758m + |--conf spark.rapids.memory.pinnedPool.size=4096m + |--conf spark.rapids.sql.batchSizeBytes=2147483647 + |--conf spark.rapids.sql.concurrentGpuTasks=2 + |--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m + |--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=160 + |--conf spark.sql.files.maxPartitionBytes=4096m + |--conf spark.task.resource.gpu.amount=0.125 + | + |Comments: + |- 'spark.executor.memoryOverhead' must be set if using 'spark.rapids.memory.pinnedPool.size + |- 'spark.executor.memoryOverhead' was not set. + |- 'spark.rapids.sql.batchSizeBytes' was not set. + |- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set. + |- 'spark.sql.adaptive.autoBroadcastJoinThreshold' was not set. + |- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set. + |- 'spark.sql.adaptive.enabled' should be enabled for better performance. + |- ${AutoTuner.classPathComments("rapids.jars.missing")} + |- ${AutoTuner.classPathComments("rapids.shuffle.jars")} + |- file.encoding should be [UTF-8] because GPU only supports the charset when using some expressions. + |""".stripMargin + // scalastyle:on line.size.limit + assert(expectedResults == autoTunerOutput) + } // Test that the properties from the eventlogs will be used to calculate the recommendations. // For example, the output should recommend "spark.executor.cores" -> 8. Although the cluster // "spark.executor.cores" is the same as the recommended value, the property is set to 16 in