Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate task metric aggregates on-the-fly to reduce memory usage #1543

Merged
merged 16 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster,
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, DataSourceRecord}
import org.apache.spark.sql.rapids.tool.store.DataSourceRecord
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph

/**
Expand Down Expand Up @@ -108,32 +108,6 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
IODiagnosticMetricsMap(key) += accum
}

/**
* Retrieves the task IDs associated with a specific stage.
*
* @param stageId The ID of the stage.
* @return A seq of task IDs corresponding to the given stage ID.
*/
private def getStageTaskIds(stageId: Int): Seq[Long] = {
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut).distinct
}

/**
* Retrieves task update values from the accumulator info for the specified stage ID.
*
* @param accumInfo AccumInfo object containing the task updates map.
* @param stageId The stage ID for which task updates need to be retrived.
* @return An array of task update values (`Long`) corresponding to the tasks
* in the specified stage.
*/
private def filterAccumTaskUpdatesForStage(accumInfo: AccumInfo, stageTaskIds: Seq[Long])
: Array[Long] = {
stageTaskIds.collect {
case taskId if accumInfo.taskUpdatesMap.contains(taskId) =>
accumInfo.taskUpdatesMap(taskId)
}(breakOut)
}

/**
* Connects Operators to Stages using AccumulatorIDs.
* TODO: This function can be fused in the visitNode function to avoid the extra iteration.
Expand Down Expand Up @@ -406,7 +380,6 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
// TODO: currently if stage IDs is empty, the result is skipped
val stageIds = sqlAccums.head.stageIds
stageIds.flatMap { stageId =>
val stageTaskIds = getStageTaskIds(stageId)
val nodeName = sqlAccums.head.nodeName

// Initialize a map to store statistics for each IO metric
Expand All @@ -426,8 +399,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total))
} else {
// Retrieve task updates which correspond to the current stage
val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo, stageTaskIds)
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates)
accumInfo.calculateAccStatsForStage(stageId)
}
}

Expand Down Expand Up @@ -477,12 +449,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keys.flatMap( stageId => {
// Retrieve task updates correspond to the current stage
val filteredTaskUpdates =
filterAccumTaskUpdatesForStage(accumInfo, getStageTaskIds(stageId))

// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) match {
accumInfo.calculateAccStatsForStage(stageId) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,15 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {

val accumHelperObj = if (app.isPhoton) { // If this a photon app, use the photonHelper
// For max peak memory, we need to look at the accumulators at the task level.
val peakMemoryValues = tasksInStage.flatMap { taskModel =>
photonPeakMemoryAccumInfos.flatMap { accumInfo =>
accumInfo.taskUpdatesMap.get(taskModel.taskId)
}
}
// We leverage the stage level metrics and get the max task update from it
val peakMemoryValues = photonPeakMemoryAccumInfos.flatMap { accumInfo =>
accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
}.map(_._2.max)
// For sum of shuffle write time, we need to look at the accumulators at the stage level.
// We get the values associated with all tasks for it
val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo =>
accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
}
}.map(_._1)
new AggAccumPhotonHelper(shuffleWriteValues, peakMemoryValues)
} else {
// For non-Photon apps, use the task metrics directly.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.analysis
import org.apache.spark.sql.rapids.tool.util.InPlaceMedianArrView.{chooseMidpointPivotInPlace, findMedianInPlace}

// Store (min, median, max, total) for a given metric
case class StatisticsMetrics(min: Long, med: Long, max: Long, total: Long)
case class StatisticsMetrics(min: Long, med: Long, var max: Long, var total: Long)

object StatisticsMetrics {
// a static variable used to represent zero-statistics instead of allocating a dummy record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object GenerateTimeline {
private val FOOTER_HEIGHT = FONT_SIZE + (PADDING * 2)
private val MS_PER_PIXEL = 5.0

// Generated using https://mokole.com/palette.html
// Generated using https://mokole.com/palette.html
private val COLORS = Array(
"#696969",
"#dcdcdc",
Expand Down Expand Up @@ -276,21 +276,15 @@ object GenerateTimeline {

val semMetricsNs = semWaitIds.toList
.flatMap(app.accumManager.accumInfoMap.get)
.flatMap(_.taskUpdatesMap.values).sum
.flatMap(_.stageValuesMap.values).map(_._1).sum

val semMetricsMs = app.accumManager.accumInfoMap.flatMap {
case (_, accumInfo: AccumInfo)
if accumInfo.infoRef.name == AccumNameRef.NAMES_TABLE.get("gpuSemaphoreWait") =>
Some(accumInfo.taskUpdatesMap.values.sum)
Some(accumInfo.stageValuesMap.values.map(_._1).sum)
case _ => None
}.sum

val readMetrics = readTimeIds.toList.flatMap(app.accumManager.accumInfoMap.get)

val opMetrics = opTimeIds.toList.flatMap(app.accumManager.accumInfoMap.get)

val writeMetrics = writeTimeIds.toList.flatMap(app.accumManager.accumInfoMap.get)

app.taskManager.getAllTasks().foreach { tc =>
val host = tc.host
val execId = tc.executorId
Expand All @@ -300,11 +294,9 @@ object GenerateTimeline {
val finishTime = tc.finishTime
val duration = tc.duration
val semTimeMs = ( semMetricsNs / 1000000) + semMetricsMs
val readTimeMs = readMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000 +
tc.sr_fetchWaitTime
val opTimeMs = opMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000
val writeTimeMs = writeMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000 +
tc.sw_writeTime / 1000000
val readTimeMs = tc.sr_fetchWaitTime
val opTimeMs = 0L
val writeTimeMs = tc.sw_writeTime / 1000000
val taskInfo = new TimelineTaskInfo(stageId, taskId, launchTime, finishTime, duration,
tc.executorDeserializeTime, readTimeMs, semTimeMs, opTimeMs, writeTimeMs)
val execHost = s"$execId/$host"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.tool.store

import scala.collection.{breakOut, mutable}
import scala.collection.mutable

import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics

Expand All @@ -32,81 +32,133 @@ import org.apache.spark.sql.rapids.tool.util.EventUtils.parseAccumFieldToLong
* @param infoRef - AccumMetaRef for the accumulator
*/
class AccumInfo(val infoRef: AccumMetaRef) {
// TODO: use sorted maps for stageIDs and taskIds?
val taskUpdatesMap: mutable.HashMap[Long, Long] =
new mutable.HashMap[Long, Long]()
val stageValuesMap: mutable.HashMap[Int, Long] =
new mutable.HashMap[Int, Long]()
/**
* Maps stageId to a tuple of:
* - Total accumulator value for the stage (Long)
* - Statistical metrics for task-level updates (StatisticsMetrics)
*/
val stageValuesMap: mutable.HashMap[Int, (Long, StatisticsMetrics)] =
new mutable.HashMap[Int, (Long, StatisticsMetrics)]()

/**
* Add accumulable to a stage while:
* 1- handling StageCompleted event.
* 2- handling taskEnd event, we want to make sure that there is an entry that maps the stage to
* the accumulable. The reason is that we want to keep track of the stageIDs in case their
* stageCompleted has never been triggered. It is common case for incomplete eventlogs.
* @param stageId the stageId pulled from the StageCompleted/TaskEnd event
* @param accumulableInfo the accumulableInfo from the StageCompleted/TaskEnd event
* @param update optional long that represents the task update value in case the call was
* triggered by a taskEnd event and the map between stage-Acc has not been
* established yet.
* Adds or updates accumulator information for a stage.
* Called during StageCompleted or TaskEnd events processing.
*
* @param stageId The ID of the stage
* @param accumulableInfo Accumulator information from the event
* @param update Optional task update value for TaskEnd events
*/
def addAccumToStage(stageId: Int,
accumulableInfo: AccumulableInfo,
update: Option[Long] = None): Unit = {
val parsedValue = accumulableInfo.value.flatMap(parseAccumFieldToLong)
// in case there is an out of order event, the value showing up later could be
// lower-than the previous value. In that case we should take the maximum.
val existingValue = stageValuesMap.getOrElse(stageId, 0L)
val existingEntry = stageValuesMap.getOrElse(stageId,
(0L, StatisticsMetrics(0L, 0L, 0L, 0L)))
val incomingValue = parsedValue match {
case Some(v) => v
case _ => update.getOrElse(0L)
}
stageValuesMap.put(stageId, Math.max(existingValue, incomingValue))
val newValue = Math.max(existingEntry._1, incomingValue)
stageValuesMap.put(stageId, (newValue, existingEntry._2))
}

/**
* Add accumulable to a task while handling TaskEnd event.
* This can propagate a call to addAccToStage if the stageId does not exist in the stageValuesMap
* @param stageId the stageId pulled from the TaskEnd event
* @param taskId the taskId pulled from the TaskEnd event
* @param accumulableInfo the accumulableInfo from the TaskEnd event
* Processes task-level accumulator updates and updates stage-level statistics.
* Called during TaskEnd event processing.
*
* @param stageId The ID of the stage containing the task
* @param taskId The ID of the completed task
* @param accumulableInfo Accumulator information from the TaskEnd event
*/
def addAccumToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = {
// For each incoming Task, we update the statistics with min, rolling average
// max and count of tasks

val parsedUpdateValue = accumulableInfo.update.flatMap(parseAccumFieldToLong)
// we need to update the stageMap if the stageId does not exist in the map
val updateStageFlag = !stageValuesMap.contains(stageId)
// This is for cases where same task updates the same accum multiple times
val existingUpdateValue = taskUpdatesMap.getOrElse(taskId, 0L)
parsedUpdateValue match {
case Some(v) =>
taskUpdatesMap.put(taskId, v + existingUpdateValue)
case None =>
taskUpdatesMap.put(taskId, existingUpdateValue)
}
// update the stage value map if necessary
if (updateStageFlag) {
addAccumToStage(stageId, accumulableInfo, parsedUpdateValue)
parsedUpdateValue.foreach{ value =>
val (total, stats) = stageValuesMap.getOrElse(stageId,
(0L, StatisticsMetrics(value, 0L, value, 0L)))
val newStats = StatisticsMetrics(
Math.min(stats.min, value),
(stats.med * stats.total + value) / ( stats.total + 1),
Math.max(stats.max, value),
stats.total + 1
)
stageValuesMap.put(stageId, (total + value, newStats))
}
}

/**
* Returns all stage IDs that have accumulator updates.
*
* @return Set of stage IDs
*/
def getStageIds: Set[Int] = {
stageValuesMap.keySet.toSet
}

/**
* Returns the smallest stage ID in the accumulator data.
*
* @return Minimum stage ID
*/
def getMinStageId: Int = {
stageValuesMap.keys.min
}

/**
* Calculates aggregate statistics across all stages for this accumulator.
*
* @return Combined StatisticsMetrics including min, median, max, and total
*/
def calculateAccStats(): StatisticsMetrics = {
// do not check stage values because the stats is only meant for task updates
StatisticsMetrics.createFromArr(taskUpdatesMap.map(_._2)(breakOut))
// While reducing we leverage the count stored during TaskEnd processing
// We use it to update the rolling average
// However, the sum ( calculated using sum of all values for stages )
// is returned at the end
val reduced_val = stageValuesMap.values.reduce { (a, b) =>
(a._1 + b._1,
StatisticsMetrics(
Math.min(a._2.min, b._2.min),
(a._2.med * a._2.total + b._2.med * b._2.total) / (a._2.total + b._2.total),
Math.max(a._2.max, b._2.max),
a._2.total + b._2.total
))
}
StatisticsMetrics(
reduced_val._2.min,
reduced_val._2.med,
reduced_val._2.max,
reduced_val._1)
}

/**
* Retrieves statistical metrics for a specific stage.
*
* @param stageId The ID of the stage
* @return Option containing StatisticsMetrics if stage exists, None otherwise
*/
def calculateAccStatsForStage(stageId: Int): Option[StatisticsMetrics] = {
val output = stageValuesMap.get(stageId)
output match {
case Some(x) => Some(StatisticsMetrics(x._2.min, x._2.med, x._2.max, x._1))
case None => None
}
}

/**
* Returns the highest accumulator value across all stages.
*
* @return Option containing maximum value if stages exist, None otherwise
*/
def getMaxStageValue: Option[Long] = {
if (stageValuesMap.values.isEmpty) {
None
} else {
Some(stageValuesMap.values.max)
Some(stageValuesMap.values.map(_._1).max)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
appIndex,appName,appId,sqlId,stageId,stageDurationMs,nodeId,nodeName,outputRowsMin,outputRowsMedian,outputRowsMax,outputRowsTotal,scanTimeMin,scanTimeMedian,scanTimeMax,scanTimeTotal,outputBatchesMin,outputBatchesMedian,outputBatchesMax,outputBatchesTotal,bufferTimeMin,bufferTimeMedian,bufferTimeMax,bufferTimeTotal,shuffleWriteTimeMin,shuffleWriteTimeMedian,shuffleWriteTimeMax,shuffleWriteTimeTotal,fetchWaitTimeMin,fetchWaitTimeMedian,fetchWaitTimeMax,fetchWaitTimeTotal,gpuDecodeTimeMin,gpuDecodeTimeMedian,gpuDecodeTimeMax,gpuDecodeTimeTotal
1,Spark shell,local-1622814619968,0,0,1743,16,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,41434653,60830365,100858775,400284505,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,0,1743,21,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,8,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,37444140,92128351,108992798,508750471,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,13,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,3,"GpuColumnarExchange",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,139875,230038,9747416,93193331,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,0,1743,16,"GpuColumnarExchange",1666666,1666666,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,41434653,66714083,100858775,400284505,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,0,1743,21,"Scan",1666666,1666666,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,8,"GpuColumnarExchange",1666666,1666666,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,37444140,84791745,108992798,508750471,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,13,"Scan",1666666,1666666,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,3,"GpuColumnarExchange",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,139875,465911,9747416,93193331,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,4,"GpuHashAggregate",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,6,"GpuShuffledHashJoin",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,7,"GpuShuffleCoalesce",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,6,"GpuShuffledHashJoin",49480,49948,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,7,"GpuShuffleCoalesce",49480,49948,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,8,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,14,"GpuCoalesceBatches",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,15,"GpuShuffleCoalesce",49480,50017,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,14,"GpuCoalesceBatches",49480,49948,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,15,"GpuShuffleCoalesce",49480,49948,50559,10000000,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,16,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,3,83,1,"GpuHashAggregate",1,1,1,1,0,0,0,0,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,3,83,2,"GpuShuffleCoalesce",200,200,200,200,0,0,0,0,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
Expand Down
Loading