Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Qualification tool triggers the AutoTuner module #739

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.tuning.QualAppSummaryInfoProvider

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

case class ApplicationSummaryInfo(
appInfo: Seq[AppInfoProfileResults],
Expand Down Expand Up @@ -201,4 +204,16 @@ object AppSummaryInfoBaseProvider {
case _ => new AppSummaryInfoBaseProvider()
}
}

/**
* Constructs an application information provider based on the results of Qualification
* tool.
* @param appInfo
* @param appAggStats optional aggregate of application stats
* @return object that can be used by the AutoTuner to calculate the recommendations
*/
def fromQualAppInfo(appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo] = None): AppSummaryInfoBaseProvider = {
new QualAppSummaryInfoProvider(appInfo, appAggStats)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import com.nvidia.spark.rapids.tool.tuning.TunerContext
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.rapids.tool.qualification._
Expand All @@ -34,7 +35,8 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
penalizeTransitions: Boolean) extends RuntimeReporter {
penalizeTransitions: Boolean,
tunerContext: Option[TunerContext]) extends RuntimeReporter {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand Down Expand Up @@ -181,6 +183,19 @@ class Qualification(outputPath: String, numRows: Int, hadoopConf: Configuration,
case Right(app: QualificationAppInfo) =>
// Case with successful creation of QualificationAppInfo
val qualSumInfo = app.aggregateStats()
tunerContext.collect {
// Run the autotuner if it is enabled.
// Note that we call the autotuner anyway without checking the aggregate results
// because the Autotuner can still make some recommendations based on the information
// enclosed by the QualificationInfo object
case tuner =>
// autoTuner is enabled for Qualification
tuner.tuneApplication(app, qualSumInfo).collect {
case res =>
logInfo(s"RecommendedProps ${app.appId} = ${res.recommendations.mkString("\n")}")
logInfo(s"Comments ${app.appId} = ${res.comments.mkString("\n")}")
}
}
if (qualSumInfo.isDefined) {
allApps.add(qualSumInfo.get)
progressBar.foreach(_.reportSuccessfulProcess())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.profiling.AutoTuner
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down Expand Up @@ -164,6 +165,16 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "Custom speedup factor file used to get estimated GPU speedup that is specific " +
"to the user's environment. If the file is not provided, it defaults to use the " +
"speedup files included in the jar.")
val autoTuner: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Toggle AutoTuner module.",
default = Some(false))
val workerInfo: ScallopOption[String] =
opt[String](required = false,
descr = "File path containing the system information of a worker node. It is assumed " +
"that all workers are homogenous. It requires the AutoTuner to be enabled. Default is " +
"./worker_info.yaml",
default = Some(AutoTuner.DEFAULT_WORKER_INFO_PATH))

validate(order) {
case o if (QualificationArgs.isOrderAsc(o) || QualificationArgs.isOrderDesc(o)) => Right(Unit)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
import com.nvidia.spark.rapids.tool.tuning.TunerContext

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
Expand Down Expand Up @@ -58,15 +59,21 @@ object QualificationMain extends Logging {
val order = appArgs.order.getOrElse("desc")
val uiEnabled = appArgs.htmlReport.getOrElse(false)
val reportSqlLevel = appArgs.perSql.getOrElse(false)
val platform = appArgs.platform()
val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false)
val penalizeTransitions = appArgs.penalizeTransitions.getOrElse(true)

val hadoopConf = RapidsToolsConfUtil.newHadoopConf
val platform = try {
PlatformFactory.createInstance(appArgs.platform())
} catch {
case ie: IllegalStateException =>
logError("Error creating the platform", ie)
return (1, Seq[QualificationSummaryInfo]())
}

val pluginTypeChecker = try {
new PluginTypeChecker(
PlatformFactory.createInstance(platform),
platform,
appArgs.speedupFactorFile.toOption)
} catch {
case ie: IllegalStateException =>
Expand All @@ -93,10 +100,16 @@ object QualificationMain extends Logging {
logWarning("No event logs to process after checking paths, exiting!")
return (0, Seq[QualificationSummaryInfo]())
}

// create the AutoTuner context object
val tunerContext = if (appArgs.autoTuner()) {
TunerContext(platform, appArgs.workerInfo(), outputDirectory, Option(hadoopConf))
} else {
None
}
val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled,
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions)
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions,
tunerContext)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import com.nvidia.spark.rapids.tool.profiling.AppSummaryInfoBaseProvider

import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

/**
* Implementation of AppInfoPropertyGetter to wrap the output of the Qualification analysis.
* @param appInfo the main QualificationAppInfo object representing the CPU application.
* @param appAggStats optional stats aggregate is included here for future improvement as we may
* need to feed the autotuner with values from the aggregates.
*/
class QualAppSummaryInfoProvider(
val appInfo: QualificationAppInfo,
val appAggStats: Option[QualificationSummaryInfo]) extends AppSummaryInfoBaseProvider {
override def isAppInfoAvailable = true
private def findPropertyInternal(
key: String, props: collection.Map[String, String]): Option[String] = {
props.get(key)
}

override def getSparkProperty(propKey: String): Option[String] = {
findPropertyInternal(propKey, appInfo.sparkProperties)
}

override def getRapidsProperty(propKey: String): Option[String] = {
getSparkProperty(propKey)
}

override def getSystemProperty(propKey: String): Option[String] = {
findPropertyInternal(propKey, appInfo.systemProperties)
}

override def getSparkVersion: Option[String] = {
Option(appInfo.sparkVersion)
}

def getAppID: String = appInfo.appId
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.profiling.{AppSummaryInfoBaseProvider, AutoTuner, Profiler}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}

/**
* Implementation of the AutoTuner for Qualification.
* @param appInfoProvider Provider of the qualification analysis data
* @param tunerContext Container which holds the arguments passed to the AutoTuner execution
*/
class QualificationAutoTuner(val appInfoProvider: QualAppSummaryInfoProvider,
val tunerContext: TunerContext) {

private def writeTuningReport(tuningResult: TuningResult,
outputDir: String, hadoopConf: Configuration): Unit = {
val textFileWriter = new ToolTextFileWriter(outputDir,
s"${tuningResult.appID}.log", s"Tuning Qual app - ${tuningResult.appID}", Option(hadoopConf))
try {
textFileWriter.write(s"### Recommended Configuration for App: ${tuningResult.appID} ###\n")
textFileWriter.write(Profiler.getAutoTunerResultsAsString(
tuningResult.recommendations, tuningResult.comments))
} finally {
textFileWriter.close()
}
}
def runAutoTuner(): TuningResult = {
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(
tunerContext.workerInfoPath, appInfoProvider, tunerContext.platform)
val (recommendations, comments) = autoTuner.getRecommendedProperties()
val resultRecord = TuningResult(appInfoProvider.getAppID, recommendations, comments)
writeTuningReport(resultRecord, tunerContext.getOutputPath, tunerContext.hadoopConf)
resultRecord
}
}

object QualificationAutoTuner extends Logging {
def apply(appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo],
tunerContext: TunerContext): Option[QualificationAutoTuner] = {
Try {
val qualInfoProvider: QualAppSummaryInfoProvider =
AppSummaryInfoBaseProvider.fromQualAppInfo(appInfo, appAggStats)
.asInstanceOf[QualAppSummaryInfoProvider]
new QualificationAutoTuner(qualInfoProvider, tunerContext)
} match {
case Success(q) => Some(q)
case Failure(e) =>
logError(
s"Failed to create Qualification tuning object for application ${appInfo.appId}", e)
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.tuning

import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids.tool.Platform
import com.nvidia.spark.rapids.tool.profiling.{RecommendedCommentResult, RecommendedPropertyResult}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification.{QualificationAppInfo, QualificationSummaryInfo}
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil

case class TuningResult(
appID: String,
recommendations: Seq[RecommendedPropertyResult],
comments: Seq[RecommendedCommentResult])

/**
* Container which holds metadata and arguments specific to the execution of the AutoTuner.
* TODO: we need to use the same class in constructing the AutoTuner in the Profiling tools.
* @param platform object representing the host platform on which the application was executed.
* @param workerInfoPath the path of the GPU workers
* @param outputRootDir the output directory to dump the recommendation/comments.
* @param hadoopConf optional configuration to access the remote storage.
*/
case class TunerContext (
platform: Platform,
workerInfoPath: String,
outputRootDir: String,
hadoopConf: Configuration) extends Logging {

def getOutputPath: String = {
s"$outputRootDir/rapids_4_spark_qualification_output/tuning"
}

def tuneApplication(
appInfo: QualificationAppInfo,
appAggStats: Option[QualificationSummaryInfo]): Option[TuningResult] = {
QualificationAutoTuner(appInfo, appAggStats, this).collect {
case qualTuner =>
Try {
qualTuner.runAutoTuner()
} match {
case Success(r) => r
case Failure(e) =>
logError(s"Failed to generate tuning recommendations for app: ${appInfo.appId}", e)
null
}
}
}
}

object TunerContext extends Logging {
def apply(platform: Platform,
workerInfoPath: String,
outputRootDir: String,
hadoopConf: Option[Configuration] = None): Option[TunerContext] = {
Try {
val hConf = hadoopConf.getOrElse(RapidsToolsConfUtil.newHadoopConf())
TunerContext(platform, workerInfoPath, outputRootDir, hConf)
} match {
case Success(c) => Some(c)
case Failure(e) =>
logError("Could not create Tuner Context", e)
None
}
}
}
Loading