Skip to content

Commit

Permalink
Add safeguards to prevent older attempts from generating metrics outp…
Browse files Browse the repository at this point in the history
…ut in Scala Tool (NVIDIA#1324)

* Add safeguards to prevent older attempts from generating qual summary output
* Adding synchronization on the reporting and autotuner level (#54)
* Fix failing scala unit tests by providing unique app IDs in test (#53)
* Ignore test for event logs with same app Id and attempt Id

---------

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Co-authored-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
parthosa and amahussein authored Sep 6, 2024
1 parent 277e951 commit 4f2d6e0
Show file tree
Hide file tree
Showing 23 changed files with 321 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
processSuccessApp(app)
progressBar.foreach(_.reportSuccessfulProcess())
val endTime = System.currentTimeMillis()
SuccessAppResult(pathStr, app.appId, s"Took ${endTime - startTime}ms to process")
SuccessAppResult(pathStr, app.appId, app.attemptId,
s"Took ${endTime - startTime}ms to process")
}
// Log the information to the console
profAppResult.logMessage()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.qualification

class AppSubscriber(val appId: String) {
val lock = new Object()
private var attemptID: Option[Int] = None

def unsafeSetAttemptId(newAttempt: Int): Boolean = {
attemptID match {
case Some(a) =>
if (newAttempt > a) {
attemptID = Some(newAttempt)
}
case None => attemptID = Some(newAttempt)
}
newAttempt == attemptID.get
}

def safeSetAttemptId(newAttempt: Int): Boolean = {
lock.synchronized {
unsafeSetAttemptId(newAttempt)
}
}
}

object AppSubscriber {
private val APP_SUBSCRIBERS = new java.util.concurrent.ConcurrentHashMap[String, AppSubscriber]()

def getOrCreate(appId: String): AppSubscriber = {
APP_SUBSCRIBERS.computeIfAbsent(appId, _ => new AppSubscriber(appId))
}

def subscribeAppAttempt(appId: String, newAttemptId: Int): Boolean = {
val subscriber = getOrCreate(appId)
subscriber.safeSetAttemptId(newAttemptId)
}

def withSafeValidAttempt[T](appId: String, currAttempt: Int)(f: () => T): Option[T] = {
val subscriber = getOrCreate(appId)
subscriber.lock.synchronized {
if (subscriber.unsafeSetAttemptId(currAttempt)) {
Option(f())
} else {
None
}
}
}

def withUnsafeValidAttempt[T](appId: String, currAttempt: Int)(f: () => T): Option[T] = {
val subscriber = getOrCreate(appId)
if (subscriber.unsafeSetAttemptId(currAttempt)) {
Option(f())
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.qualification

import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.JavaConverters._

Expand All @@ -41,7 +41,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,

override val simpleName: String = "qualTool"
override val outputDir = s"$outputPath/rapids_4_spark_qualification_output"
private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()
private val allApps = new ConcurrentHashMap[String, QualificationSummaryInfo]()

override def getNumThreads: Int = nThreads

Expand Down Expand Up @@ -72,7 +72,7 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
threadPool.shutdownNow()
}
progressBar.foreach(_.finishAll())
val allAppsSum = estimateAppFrequency(allApps.asScala.toSeq)
val allAppsSum = estimateAppFrequency(allApps.asScala.values.toSeq)
// sort order and limit only applies to the report summary text file,
// the csv file we write the entire data in descending order
val sortedDescDetailed = sortDescForDetailedReport(allAppsSum)
Expand Down Expand Up @@ -162,27 +162,48 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
// this is a bit ugly right now to overload writing out the report and returning the
// DataSource information but this encapsulates the analyzer to keep the memory usage
// smaller.
val dsInfo = QualRawReportGenerator.generateRawMetricQualViewAndGetDataSourceInfo(
outputDir, app, appIndex)
val dsInfo =
AppSubscriber.withSafeValidAttempt(app.appId, app.attemptId) { () =>
QualRawReportGenerator.generateRawMetricQualViewAndGetDataSourceInfo(
outputDir, app, appIndex)
}.getOrElse(Seq.empty)
val qualSumInfo = app.aggregateStats()
tunerContext.foreach { tuner =>
// Run the autotuner if it is enabled.
// Note that we call the autotuner anyway without checking the aggregate results
// because the Autotuner can still make some recommendations based on the information
// enclosed by the QualificationInfo object
tuner.tuneApplication(app, qualSumInfo, appIndex, dsInfo)
AppSubscriber.withSafeValidAttempt(app.appId, app.attemptId) { () =>
tunerContext.foreach { tuner =>
// Run the autotuner if it is enabled.
// Note that we call the autotuner anyway without checking the aggregate results
// because the Autotuner can still make some recommendations based on the information
// enclosed by the QualificationInfo object
tuner.tuneApplication(app, qualSumInfo, appIndex, dsInfo)
}
}
if (qualSumInfo.isDefined) {
// add the recommend cluster info into the summary
val tempSummary = qualSumInfo.get
val newClusterSummary = tempSummary.clusterSummary.copy(
recommendedClusterInfo = pluginTypeChecker.platform.recommendedClusterInfo)
val newQualSummary = tempSummary.copy(clusterSummary = newClusterSummary)
allApps.add(newQualSummary)
progressBar.foreach(_.reportSuccessfulProcess())
val endTime = System.currentTimeMillis()
SuccessAppResult(pathStr, app.appId,
s"Took ${endTime - startTime}ms to process")
AppSubscriber.withSafeValidAttempt(app.appId, app.attemptId) { () =>
val newQualSummary = tempSummary.copy(clusterSummary = newClusterSummary)
// check if the app is already in the map
if (allApps.containsKey(app.appId)) {
// fix the progress bar counts
progressBar.foreach(_.adjustCounterForMultipleAttempts())
logInfo(s"Removing older app summary for app: ${app.appId} " +
s"before adding the new one with attempt: ${app.attemptId}")
}
progressBar.foreach(_.reportSuccessfulProcess())
allApps.put(app.appId, newQualSummary)
val endTime = System.currentTimeMillis()
SuccessAppResult(pathStr, app.appId, app.attemptId,
s"Took ${endTime - startTime}ms to process")
} match {
case Some(successfulResult) => successfulResult
case _ =>
// If the attemptId is an older attemptId, skip this attempt.
// This can happen when the user has provided event logs for multiple attempts
progressBar.foreach(_.reportSkippedProcess())
SkippedAppResult.fromAppAttempt(pathStr, app.appId, app.attemptId)
}
} else {
progressBar.foreach(_.reportUnkownStatusProcess())
UnknownAppResult(pathStr, app.appId,
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventL
import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser}
import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode
import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DataSourceCase, DriverAccumCase, JobInfoClass, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase}
import com.nvidia.spark.rapids.tool.qualification.AppSubscriber
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -53,6 +54,8 @@ abstract class AppBase(
}
}

lazy val attemptId: Int = appMetaData.map(_.attemptId).getOrElse(1)

// Store map of executorId to executor info
val executorIdToInfo = new HashMap[String, ExecutorInfoClass]()
// resourceProfile id to resource profile info
Expand Down Expand Up @@ -396,7 +399,20 @@ abstract class AppBase(
probNotDataset.values.flatten.toSet.toSeq
}

/**
* Registers the attempt ID for the application and updates the tracker map if the attemptId is
* greater than the existing attemptId.
*/
def registerAttemptId(): Unit = {
if (isAppMetaDefined) {
val currentAttemptId = sparkProperties.getOrElse("spark.app.attempt.id", "1").toInt
appMetaData.foreach(_.setAttemptId(currentAttemptId))
AppSubscriber.subscribeAppAttempt(appId, currentAttemptId)
}
}

protected def postCompletion(): Unit = {
registerAttemptId()
calculateAppDuration()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ import org.apache.spark.ui.UIUtils
* @param sparkUser user who ran the Spark application
* @param startTime startTime of a Spark application
* @param endTime endTime of the spark Application
* @param attemptId attemptId of the application
*/
class AppMetaData(
val eventLogPath: Option[String],
val appName: String,
val appId: Option[String],
val sparkUser: String,
val startTime: Long,
var endTime: Option[Long] = None) {
var endTime: Option[Long] = None,
var attemptId: Int = 1) {

// Calculated as time in ms
var duration: Option[Long] = _
Expand Down Expand Up @@ -75,6 +77,10 @@ class AppMetaData(

def isDurationEstimated: Boolean = durationEstimated

def setAttemptId(attemptId: Int): Unit = {
this.attemptId = attemptId
}

// Initialization code:
// - Calculate the duration based on the constructor argument endTime
calculateDurationInternal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
event: SparkListenerEnvironmentUpdate): Unit = {
logDebug("Processing event: " + event.getClass)
app.handleEnvUpdateForCachedProps(event)
app.registerAttemptId()
}

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ class ConsoleProgressBar(
}
}

def adjustCounterForMultipleAttempts(): Unit = {
successCounter.decrementAndGet()
skippedCounter.incrementAndGet()
}

def reportSuccessfulProcess(): Unit = {
successCounter.incrementAndGet()
totalCounter.incrementAndGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AppResult(path: String, message: String) extends Logging {
case class SuccessAppResult(
path: String,
appId: String,
attemptId: Int,
message: String = "") extends AppResult(path, message) {
override def logMessage(exp: Option[Exception] = None): Unit = {
logInfo(s"File: $path, Message: $message")
Expand All @@ -52,3 +53,10 @@ case class UnknownAppResult(path: String, appId: String, message: String)

case class SkippedAppResult(path: String, message: String)
extends AppResult(path, message) {}

object SkippedAppResult {
def fromAppAttempt(path: String, appId: String, attemptId: Int): SkippedAppResult = {
SkippedAppResult(path, s"For App ID '$appId'; skipping this " +
s"attempt $attemptId as a newer attemptId is being processed")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.rapids.tool.util

import com.nvidia.spark.rapids.tool.profiling.AppStatusResult
import com.nvidia.spark.rapids.tool.qualification.AppSubscriber
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
Expand All @@ -27,18 +28,33 @@ trait RuntimeReporter extends Logging {
RuntimeUtil.generateReport(outputDir, hadoopConf)
}

/**
* Updates the status of "SUCCESS" applications to "SKIPPED" if newer attempts with
* the same appId exist.
*/
private def skipAppsWithOlderAttempts(appStatuses: Seq[AppResult]): Seq[AppResult] = {
appStatuses map {
case successApp: SuccessAppResult =>
AppSubscriber.withUnsafeValidAttempt(successApp.appId, successApp.attemptId) { () =>
successApp
}.getOrElse(SkippedAppResult.fromAppAttempt(successApp.path, successApp.appId,
successApp.attemptId))
case otherApp: AppResult => otherApp
}
}

/**
* For each app status report, generate an AppStatusResult.
* If appId is empty, convert to "N/A" in the output.
* @return Seq[AppStatusResult] - Seq[(path, status, appId, message)]
*/
def generateStatusResults(appStatuses: Seq[AppResult]): Seq[AppStatusResult] = {
appStatuses.map {
skipAppsWithOlderAttempts(appStatuses).map {
case FailureAppResult(path, message) =>
AppStatusResult(path, "FAILURE", "N/A", message)
case SkippedAppResult(path, message) =>
AppStatusResult(path, "SKIPPED", "N/A", message)
case SuccessAppResult(path, appId, message) =>
case SuccessAppResult(path, appId, _, message) =>
AppStatusResult(path, "SUCCESS", appId, message)
case UnknownAppResult(path, appId, message) =>
val finalAppId = if (appId.isEmpty) "N/A" else appId
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 4f2d6e0

Please sign in to comment.