Skip to content

Commit

Permalink
profiler resolve merge conflict
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
  • Loading branch information
cindyyuanjiang committed Feb 24, 2025
2 parents 926995c + fdd0f13 commit 96b4a3d
Show file tree
Hide file tree
Showing 110 changed files with 5,118 additions and 1,017 deletions.
3 changes: 2 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>24.12.2-SNAPSHOT</version>
<version>24.12.5-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down Expand Up @@ -417,6 +417,7 @@
<maven.artifact.version>3.9.0</maven.artifact.version>
<scala.javac.args>-Xlint:all,-serial,-path,-try</scala.javac.args>
<rapids.shade.package>com.nvidia.shaded.spark</rapids.shade.package>
<benchmarks.checkpoints>noOp</benchmarks.checkpoints>
<jsoup.version>1.16.1</jsoup.version>
<!-- properties used for DeltaLake -->
<delta10x.version>1.0.1</delta10x.version>
Expand Down
184 changes: 184 additions & 0 deletions core/src/main/resources/bootstrap/tuningTable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Copyright (c) 2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

tuningDefinitions:
- label: spark.databricks.adaptive.autoOptimizeShuffle.enabled
description: 'Auto-Optimized shuffle. It is recommended to turn it off to set (spark.sql.shuffle.partitions) manually.'
enabled: true
level: job
category: tuning
- label: spark.dataproc.enhanced.execution.enabled
description: 'Enables enhanced execution. Turning this on might cause the accelerated dataproc cluster to hang.'
enabled: true
level: job
category: tuning
comments:
persistent: 'should be disabled. WARN: Turning this property on might case the GPU accelerated Dataproc cluster to hang.'
- label: spark.dataproc.enhanced.optimizer.enabled
description: 'Enables enhanced optimizer. Turning this on might cause the accelerated dataproc cluster to hang.'
enabled: true
level: job
category: tuning
comments:
persistent: 'should be disabled. WARN: Turning this property on might case the GPU accelerated Dataproc cluster to hang.'
- label: spark.executor.cores
description: 'The number of cores to use on each executor. It is recommended to be set to 16'
enabled: true
level: cluster
category: tuning
- label: spark.executor.instances
description: 'Controls parellelism level. It is recommended to be set to (cpuCoresPerNode * numWorkers) / spark.executor.cores.'
enabled: true
level: cluster
category: tuning
- label: spark.executor.memory
description: 'Amount of memory to use per executor process. This is tuned based on the available CPU memory on worker node.'
enabled: true
level: cluster
category: tuning
- label: spark.executor.memoryOverhead
description: 'Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size.'
enabled: true
level: cluster
category: tuning
- label: spark.executor.memoryOverheadFactor
description: 'Fraction of executor memory to be allocated as additional non-heap memory per executor process. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size.'
enabled: true
level: cluster
category: tuning
- label: spark.kubernetes.memoryOverheadFactor
description: 'Specific to K8s. Fraction of executor memory to be allocated as additional non-heap memory per executor process.'
enabled: true
level: cluster
category: tuning
- label: spark.locality.wait
description: 'The time to wait to launch a data-local task before giving up and launching it on a less-local node. It is recommended to avoid waiting for a data-local task.'
enabled: true
level: cluster
category: tuning
defaultSpark: "3s"
- label: spark.rapids.filecache.enabled
description: 'Enables RAPIDS file cache. The file cache stores data locally in the same local directories that have been configured for the Spark executor.'
enabled: true
level: job
category: tuning
- label: spark.rapids.memory.pinnedPool.size
description: 'The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.'
enabled: true
level: cluster
category: tuning
- label: spark.rapids.shuffle.multiThreaded.maxBytesInFlight
description: 'This property controls the amount of bytes we allow in flight per Spark task. This typically happens on the reader side, when blocks are received from the network, they’re queued onto these threads for decompression and decode. '
enabled: true
level: cluster
category: tuning
- label: spark.rapids.shuffle.multiThreaded.reader.threads
description: 'The shuffle reader is a single implementation irrespective of the number of partitions. Set the value to zero to turn off multi-threaded reader entirely.'
enabled: true
level: cluster
category: tuning
- label: spark.rapids.shuffle.multiThreaded.writer.threads
description: ''
enabled: true
level: cluster
category: tuning
- label: spark.rapids.sql.batchSizeBytes
description: 'Set the target number of bytes for a GPU batch. Splits sizes for input data is covered by separate configs.'
enabled: true
level: job
category: tuning
- label: spark.rapids.sql.concurrentGpuTasks
description: 'Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.'
enabled: true
level: cluster
category: tuning
- label: spark.rapids.sql.format.parquet.multithreaded.combine.waitTime
description: 'When using the multithreaded parquet reader with combine mode, how long to wait, in milliseconds, for more files to finish if haven’t met the size threshold. Note that this will wait this amount of time from when the last file was available, so total wait time could be larger then this. DEPRECATED: use spark.rapids.sql.reader.multithreaded.combine.waitTime instead.'
enabled: true
level: cluster
category: tuning
- label: spark.rapids.sql.enabled
description: 'Should be true to enable SQL operations on the GPU.'
enabled: true
level: cluster
category: functionality
- label: spark.rapids.sql.multiThreadedRead.numThreads
description: 'The maximum number of threads on each executor to use for reading small files in parallel.'
enabled: true
level: cluster
category: tuning
- label: spark.rapids.sql.reader.multithreaded.combine.sizeBytes
description: 'The target size in bytes to combine multiple small files together when using the MULTITHREADED parquet or orc reader. With combine disabled, the MULTITHREADED reader reads the files in parallel and sends individual files down to the GPU, but that can be inefficient for small files.'
enabled: true
level: cluster
category: tuning
- label: spark.shuffle.manager
description: 'The RAPIDS Shuffle Manager is an implementation of the ShuffleManager interface in Apache Spark that allows custom mechanisms to exchange shuffle data. We currently expose two modes of operation: Multi Threaded and UCX.'
enabled: true
level: cluster
category: tuning
- label: spark.sql.adaptive.enabled
description: 'When true, enable adaptive query execution, which re-optimizes the query plan in the middle of query execution, based on accurate runtime statistics.'
enabled: true
level: job
category: tuning
defaultSpark: "true"
- label: spark.sql.adaptive.advisoryPartitionSizeInBytes
description: 'The advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.'
enabled: true
level: job
category: tuning
- label: spark.sql.adaptive.coalescePartitions.initialPartitionNum
description: 'The initial number of shuffle partitions before coalescing. If not set, it equals to spark.sql.shuffle.partitions.'
enabled: true
level: job
category: tuning
- label: spark.sql.adaptive.coalescePartitions.minPartitionNum
description: '(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions after coalescing. If not set, the default value is the default parallelism of the Spark cluster.'
enabled: true
level: job
category: tuning
- label: spark.sql.adaptive.coalescePartitions.minPartitionSize
description: 'The minimum size of shuffle partitions after coalescing. This is useful when the adaptively calculated target size is too small during partition coalescing.'
enabled: true
level: job
category: tuning
defaultSpark: "1m"
- label: spark.sql.adaptive.coalescePartitions.parallelismFirst
description: 'When true, Spark does not respect the target size specified by (spark.sql.adaptive.advisoryPartitionSizeInBytes) (default 64MB) when coalescing contiguous shuffle partitions, but adaptively calculate the target size according to the default parallelism of the Spark cluster.'
enabled: true
level: job
category: tuning
defaultSpark: "true"
- label: spark.sql.adaptive.autoBroadcastJoinThreshold
description: 'Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled.'
enabled: true
level: job
category: tuning
- label: spark.sql.files.maxPartitionBytes
description: 'The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.'
enabled: true
level: job
category: tuning
- label: spark.sql.shuffle.partitions
description: 'The default number of partitions to use when shuffling data for joins or aggregations. Note: For structured streaming, this configuration cannot be changed between query restarts from the same checkpoint location.'
enabled: true
level: job
category: tuning
defaultSpark: "200"
- label: spark.task.resource.gpu.amount
description: 'The GPU resource amount per task when Apache Spark schedules GPU resources. For example, setting the value to 1 means that only one task will run concurrently per executor.'
enabled: true
level: cluster
category: tuning
3 changes: 2 additions & 1 deletion core/src/main/resources/configs/build.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -23,3 +23,4 @@ build.spark.version=${spark.version}
build.hadoop.version=${hadoop.version}
build.java.version=${java.version}
build.scala.version=${scala.version}
build.benchmarks.checkpoints=${benchmarks.checkpoints}
28 changes: 23 additions & 5 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool
import scala.annotation.tailrec

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.tuning.ClusterProperties
import com.nvidia.spark.rapids.tool.tuning.{ClusterProperties, ProfilingAutoTunerConfigsProvider}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
Expand Down Expand Up @@ -77,7 +77,18 @@ object InstanceInfo {
// format (numGpus, numCores) -> InstanceInfo about that CSP node instance type
object PlatformInstanceTypes {

val AWS_BY_GPUS_CORES = Map((1, 4) -> InstanceInfo(4, 16 * 1024, "g5.xlarge", 1),
// Using G6 instances for EMR
val EMR_BY_GPUS_CORES = Map((1, 4) -> InstanceInfo(4, 16 * 1024, "g6.xlarge", 1),
(1, 8) -> InstanceInfo(8, 32 * 1024, "g6.2xlarge", 1),
(1, 16) -> InstanceInfo(16, 64 * 1024, "g6.4xlarge", 1),
(1, 32) -> InstanceInfo(32, 128 * 1024, "g6.8xlarge", 1),
(4, 48) -> InstanceInfo(48, 192 * 1024, "g6.12xlarge", 1),
(1, 64) -> InstanceInfo(64, 256 * 1024, "g6.16xlarge", 1)
)

// Using G5 instances. To be updated once G6 availability on Databricks
// is consistent
val DATABRICKS_AWS_BY_GPUS_CORES = Map((1, 4) -> InstanceInfo(4, 16 * 1024, "g5.xlarge", 1),
(1, 8) -> InstanceInfo(8, 32 * 1024, "g5.2xlarge", 1),
(1, 16) -> InstanceInfo(16, 64 * 1024, "g5.4xlarge", 1),
(1, 32) -> InstanceInfo(32, 128 * 1024, "g5.8xlarge", 1),
Expand Down Expand Up @@ -147,7 +158,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
var recommendedClusterInfo: Option[RecommendedClusterInfo] = None

// Default recommendation based on NDS benchmarks (note: this could be platform specific)
def recommendedCoresPerExec = 16
def recommendedCoresPerExec: Int = ProfilingAutoTunerConfigsProvider.DEF_CORES_PER_EXECUTOR
// Default number of GPUs to use, currently we do not support multiple GPUs per node
def recommendedGpusPerNode = 1
def defaultNumGpus: Int = 1
Expand Down Expand Up @@ -559,7 +570,7 @@ class DatabricksAwsPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = A10GGpu

override def getInstanceByResourcesMap: Map[(Int, Int), InstanceInfo] = {
PlatformInstanceTypes.AWS_BY_GPUS_CORES
PlatformInstanceTypes.DATABRICKS_AWS_BY_GPUS_CORES
}
}

Expand All @@ -579,6 +590,13 @@ class DataprocPlatform(gpuDevice: Option[GpuDevice],
clusterProperties: Option[ClusterProperties]) extends Platform(gpuDevice, clusterProperties) {
override val platformName: String = PlatformNames.DATAPROC
override val defaultGpuDevice: GpuDevice = T4Gpu
override val recommendationsToInclude: Seq[(String, String)] = Seq(
// Keep disabled. This property does not work well with GPU clusters.
"spark.dataproc.enhanced.optimizer.enabled" -> "false",
// Keep disabled. This property does not work well with GPU clusters.
"spark.dataproc.enhanced.execution.enabled" -> "false"
)

override def isPlatformCSP: Boolean = true
override def maxGpusSupported: Int = 4

Expand Down Expand Up @@ -629,7 +647,7 @@ class EmrPlatform(gpuDevice: Option[GpuDevice],
}

override def getInstanceByResourcesMap: Map[(Int, Int), InstanceInfo] = {
PlatformInstanceTypes.AWS_BY_GPUS_CORES
PlatformInstanceTypes.EMR_BY_GPUS_CORES
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster,
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, DataSourceRecord}
import org.apache.spark.sql.rapids.tool.store.DataSourceRecord
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph

/**
Expand Down Expand Up @@ -132,32 +132,6 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
IODiagnosticMetricsMap(key) += accum
}

/**
* Retrieves the task IDs associated with a specific stage.
*
* @param stageId The ID of the stage.
* @return A seq of task IDs corresponding to the given stage ID.
*/
private def getStageTaskIds(stageId: Int): Seq[Long] = {
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut).distinct
}

/**
* Retrieves task update values from the accumulator info for the specified stage ID.
*
* @param accumInfo AccumInfo object containing the task updates map.
* @param stageId The stage ID for which task updates need to be retrived.
* @return An array of task update values (`Long`) corresponding to the tasks
* in the specified stage.
*/
private def filterAccumTaskUpdatesForStage(accumInfo: AccumInfo, stageTaskIds: Seq[Long])
: Array[Long] = {
stageTaskIds.collect {
case taskId if accumInfo.taskUpdatesMap.contains(taskId) =>
accumInfo.taskUpdatesMap(taskId)
}(breakOut)
}

/**
* Connects Operators to Stages using AccumulatorIDs.
* TODO: This function can be fused in the visitNode function to avoid the extra iteration.
Expand Down Expand Up @@ -432,7 +406,6 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// TODO: currently if stage IDs is empty, the result is skipped
val stageIds = sqlAccums.head.stageIds
stageIds.flatMap { stageId =>
val stageTaskIds = getStageTaskIds(stageId)
val nodeName = sqlAccums.head.nodeName

// Initialize a map to store statistics for each IO metric
Expand All @@ -445,15 +418,19 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val accumInfoOpt = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId)

val metricStats: Option[StatisticsMetrics] = accumInfoOpt.flatMap { accumInfo =>
if (!accumInfo.stageValuesMap.contains(stageId)) {
if (!accumInfo.containsStage(stageId)) {
None
} else if (stageIds.size == 1) {
// Skip computing statistics when there is only one stage
Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total))
Some(StatisticsMetrics(
min = sqlAccum.min,
med = sqlAccum.median,
max = sqlAccum.max,
count = 0,
total = sqlAccum.total))
} else {
// Retrieve task updates which correspond to the current stage
val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo, stageTaskIds)
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates)
accumInfo.calculateAccStatsForStage(stageId)
}
}

Expand Down Expand Up @@ -567,13 +544,9 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keys.flatMap( stageId => {
// Retrieve task updates correspond to the current stage
val filteredTaskUpdates =
filterAccumTaskUpdatesForStage(accumInfo, getStageTaskIds(stageId))

accumInfo.getStageIds.flatMap( stageId => {
// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) match {
accumInfo.calculateAccStatsForStage(stageId) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
Expand Down
Loading

0 comments on commit 96b4a3d

Please sign in to comment.