Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate a detailed report for the write ops #1544

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@ package com.nvidia.spark.rapids.tool.planparser
import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.store.{WriteOperationMetaBuilder, WriteOperationMetadataTrait}
import org.apache.spark.sql.rapids.tool.util.StringUtils

case class DataWritingCommandExecParser(
node: SparkPlanGraphNode,
Expand Down Expand Up @@ -106,13 +108,13 @@ object DataWritingCommandExecParser {
// This used for expressions that do not show the format as part of the description.
private val specialWriteFormatMap = Map[String, String](
// if appendDataExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
appendDataExecV1 -> "unknown",
appendDataExecV1 -> StringUtils.UNKNOWN_EXTRACT,
// if overwriteByExprExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
overwriteByExprExecV1 -> "unknown",
overwriteByExprExecV1 -> StringUtils.UNKNOWN_EXTRACT,
// if atomicReplaceTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicReplaceTableExec -> "unknown",
atomicReplaceTableExec -> StringUtils.UNKNOWN_EXTRACT,
// if atomicCreateTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicCreateTableExec -> "unknown"
atomicCreateTableExec -> StringUtils.UNKNOWN_EXTRACT
)

// Checks whether a node is a write CMD Exec
Expand Down Expand Up @@ -175,4 +177,166 @@ object DataWritingCommandExecParser {
parsedString.split(",")(0) // return third parameter from the input string
}
}

/**
* Extracts metadata information from a write operation node description.
* This method is specifically designed to parse the description of
* `InsertIntoHadoopFsRelationCommand` nodes and extract relevant details
* such as the output path, data format, write mode, catalog information,
* and output columns.
*
* The method performs the following steps:
* - Extracts the output path and data format from the node description.
* - Determines the write mode (e.g., Append, Overwrite) based on specific keywords in the
* description.
* - Extracts catalog information (database and table name) from the output path.
* - Extracts the output columns if available in the description.
* - Builds and returns a `WriteOperationMetadataTrait` object encapsulating the extracted
* metadata.
*
* This method includes error handling to ensure graceful fallback to default values
* (e.g., `UNKNOWN_EXTRACT`) in case of unexpected input or parsing errors.
*
* @param execName The name of the execution command (e.g., `InsertIntoHadoopFsRelationCommand`).
* @param nodeDescr The description of the node, typically containing details about the write
* operation.
* @return A `WriteOperationMetadataTrait` object containing the extracted metadata.
*/
private def extractWriteOpRecord(
execName: String, nodeDescr: String): WriteOperationMetadataTrait = {
// Helper function to extract catalog information (database and table name) from the output
// path.
def extractCatalog(path: String): (String, String) = {
try {
// Split the URI into parts by "/"
val pathParts = path.split("/").filter(_.nonEmpty)
if (pathParts.length >= 2) {
// Extract the last two parts as database and table name
val database = pathParts(pathParts.length - 2)
val tableName = pathParts.last
(database, tableName)
} else {
// If not enough parts, return UNKNOWN_EXTRACT
(StringUtils.UNKNOWN_EXTRACT, StringUtils.UNKNOWN_EXTRACT)
}
} catch {
// Handle any unexpected errors gracefully
case _: Exception => (StringUtils.UNKNOWN_EXTRACT, StringUtils.UNKNOWN_EXTRACT)
}
}

// Helper function to extract the output path and data format from the node description.
def extractPathAndFormat(args: Array[String]): (String, String) = {
// Extract the path from the first argument
val path =
args.headOption.map(_.split("\\s+").last.trim).getOrElse(StringUtils.UNKNOWN_EXTRACT)
// Extract the data format from the third argument
val thirdArg = args.lift(2).getOrElse("").trim
val format = if (thirdArg.startsWith("[")) {
// Optional parameter is present in the eventlog. Get the fourth parameter by skipping the
// optional parameter string.
thirdArg.split("(?<=],)")
.map(_.trim).lift(1).getOrElse("").split(",").headOption.getOrElse("").trim
} else {
thirdArg.split(",").headOption.getOrElse("").trim
}
(path, format)
}

// Helper function to determine the write mode (e.g., Append, Overwrite) from the description.
def extractWriteMode(description: String): String = {
val modes = Map(
", Append," -> "Append",
", Overwrite," -> "Overwrite",
", ErrorIfExists," -> "ErrorIfExists",
", Ignore," -> "Ignore"
)
// Match the description against known write modes
modes.collectFirst { case (key, mode) if description.contains(key) => mode }
.getOrElse(StringUtils.UNKNOWN_EXTRACT)
}

// Helper function to extract output columns from the node description.
def extractOutputColumns(description: String): Option[String] = {
// Use a regular expression to find column definitions enclosed in square brackets
val columnsRegex = """\[(.*?)\]""".r
columnsRegex.findAllMatchIn(description).map(_.group(1)).toList.lastOption
.map(_.replaceAll(",\\s+", ";")) // Replace commas with semicolons for better readability
}

// Parse the node description into arguments
val splitArgs = nodeDescr.split(",", 3)

// Extract the output path and data format
val (path, format) = extractPathAndFormat(splitArgs)

// Extract the write mode (e.g., Append, Overwrite)
val writeMode = extractWriteMode(nodeDescr)

// Extract catalog information (database and table name) from the output path
val (catalogDB, catalogTable) = extractCatalog(path)

// Extract the output columns, if available
val outColumns = extractOutputColumns(nodeDescr)

// Build and return the metadata object encapsulating all extracted information
WriteOperationMetaBuilder.build(
execName = execName,
dataFormat = format,
outputPath = Option(path),
outputColumns = outColumns,
writeMode = writeMode,
tableName = catalogTable,
dataBaseName = catalogDB,
fullDescr = Some(nodeDescr)
)
}

/**
* Extracts metadata information from a given SparkPlanGraphNode representing a write operation.
*
* This method determines the type of write operation (e.g., Delta Lake or other supported
* commands) and extracts relevant metadata such as execution name, data format, output path,
* write mode, and catalog information. It uses helper methods to parse the node description
* and build a metadata object encapsulating the extracted details.
*
* The method performs the following steps:
* 1. Retrieves the node description from the provided SparkPlanGraphNode.
* 2. Checks if the node is a Delta Lake write operation using DeltaLakeHelper.
* 3. Retrieves the appropriate command wrapper (logical or physical) for the node.
* 4. If the command is `InsertIntoHadoopFsRelationCommand`, it invokes a specialized method
* `extractWriteOpRecord` to extract detailed metadata.
* 5. For other commands, it builds a metadata object using the command wrapper's information.
* 6. If no command wrapper is found, it falls back to building a metadata object with minimal
* information.
*
* @param node The SparkPlanGraphNode representing the write operation.
* @return A WriteOperationMetadataTrait object containing the extracted metadata.
*/
def getWriteOpMetaFromNode(node: SparkPlanGraphNode): WriteOperationMetadataTrait = {
// Determine the appropriate command wrapper based on whether the node is a Delta Lake write
// operation.
val cmdWrapper = if (DeltaLakeHelper.acceptsWriteOp(node)) {
DeltaLakeHelper.getWriteCMDWrapper(node)
} else {
getWriteCMDWrapper(node)
}
// Process the command wrapper to extract metadata
cmdWrapper match {
case Some(cmdWrapper) =>
// If the command is InsertIntoHadoopFsRelationCommand, extract detailed metadata.
if (cmdWrapper.execName == DataWritingCommandExecParser.insertIntoHadoopCMD) {
extractWriteOpRecord(cmdWrapper.execName, node.desc)
} else {
// For other commands, build metadata using the command wrapper's information.
WriteOperationMetaBuilder.build(
execName = cmdWrapper.execName,
dataFormat = cmdWrapper.dataFormat,
fullDescr = Some(node.desc))
}
case _ =>
// If no command wrapper is found, build metadata with minimal information.
WriteOperationMetaBuilder.buildNoMeta(Some(node.desc))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.{SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.SqlPlanInfoGraphEntry
import org.apache.spark.sql.rapids.tool.util.StringUtils

// A class used to handle the DL writeOps such as:
// - AppendDataExecV1
Expand Down Expand Up @@ -135,8 +136,6 @@ object DeltaLakeHelper {
def parseNode(node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long): ExecInfo = {
val opExec = new DLWriteWithFormatAndSchemaParser(node, checker, sqlID)
opExec.parse
node match {
case n if acceptsWriteOp(n) =>
val opExec = new DLWriteWithFormatAndSchemaParser(node, checker, sqlID)
Expand All @@ -145,6 +144,25 @@ object DeltaLakeHelper {
}
}

/**
* Get the write command wrapper for the given node deltaLake exec node.
* This method should be called only if the node passes the `acceptsWriteOp` check.
* @param node the deltaLake write exec
* @return the write command wrapper
*/
def getWriteCMDWrapper(node: SparkPlanGraphNode): Option[DataWritingCmdWrapper] = {
val wcmd = exclusiveDeltaExecs.find(node.name.contains(_)) match {
case Some(cmd) => cmd
case _ =>
deltaExecsFromSpark.find(node.name.contains(_)) match {
case Some(cmd) => cmd
case _ => StringUtils.UNKNOWN_EXTRACT
}
}
// The format must be delta
Some(DataWritingCmdWrapper(wcmd, DataWritingCommandExecParser.dataWriteCMD, getWriteFormat))
}

// Kept for future use if we find that SerDe library can be used to deduce any information to
// reflect on the support of the Op
def getSerdeLibrary(nodeDesc: String): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.EventUtils
import org.apache.spark.sql.rapids.tool.util.{EventUtils, StringUtils}

// A wrapper class to map between
case class HiveScanSerdeClasses(className: String, format: String) extends Logging {
Expand Down Expand Up @@ -68,7 +68,7 @@ object HiveParseHelper extends Logging {
}

def getHiveFormatFromSimpleStr(str: String): String = {
LOADED_SERDE_CLASSES.find(_.accepts(str)).map(_.format).getOrElse("unknown")
LOADED_SERDE_CLASSES.find(_.accepts(str)).map(_.format).getOrElse(StringUtils.UNKNOWN_EXTRACT)
}

// Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.StringUtils

case class ReadMetaData(schema: String, location: String, format: String,
tags: Map[String, String] = ReadParser.DEFAULT_METAFIELD_MAP) {
Expand Down Expand Up @@ -60,7 +61,7 @@ object ReadParser extends Logging {
val METAFIELD_TAG_FORMAT = "Format"
val METAFIELD_TAG_LOCATION = "Location"

val UNKNOWN_METAFIELD: String = "unknown"
val UNKNOWN_METAFIELD: String = StringUtils.UNKNOWN_EXTRACT
val DEFAULT_METAFIELD_MAP: Map[String, String] = collection.immutable.Map(
METAFIELD_TAG_DATA_FILTERS -> UNKNOWN_METAFIELD,
METAFIELD_TAG_PUSHED_FILTERS -> UNKNOWN_METAFIELD,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.rapids.tool.util.StringUtils

class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait {
// Preformatted values for CSV output to avoid reformatting multiple times.
val csvValue: String = StringUtils.reformatCSVString(value)
val csvOpType: String = StringUtils.reformatCSVString(opType.toString)
lazy val csvValue: String = StringUtils.reformatCSVString(value)
lazy val csvOpType: String = StringUtils.reformatCSVString(opType.toString)

override def getOpName: String = value
override def getOpNameCSV: String = csvValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.AppSummaryInfoBaseProvider
import com.nvidia.spark.rapids.tool.views.WriteOpProfileResult

case class ApplicationSummaryInfo(
appInfo: Seq[AppInfoProfileResults],
Expand Down Expand Up @@ -47,7 +48,8 @@ case class ApplicationSummaryInfo(
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent])
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
writeOpsInfo: Seq[WriteOpProfileResult])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfDataSourceView.getRawView(apps, cachedSqlAccum)
}

// get the write records information
def getWriteOperationInfo: Seq[WriteOpProfileResult] = {
ProfWriteOpsView.getRawView(apps)
}

// get executor related information
def getExecutorInfo: Seq[ExecutorInfoProfileResult] = {
ProfExecutorView.getRawView(apps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,15 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo,
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo),
compareRes, DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics))
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo,
collect.getWriteOperationInfo),
compareRes,
DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics))
}

/**
Expand Down Expand Up @@ -502,7 +504,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex),
combineProps("system", appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex),
appsSum.flatMap(_.sparkRapidsBuildInfo)
appsSum.flatMap(_.sparkRapidsBuildInfo),
appsSum.flatMap(_.writeOpsInfo).sortBy(_.appIndex)
)
Seq(reduced)
} else {
Expand Down Expand Up @@ -546,6 +549,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some(AGG_DESCRIPTION(SQL_AGG_LABEL)))
profileOutputWriter.write(IO_LABEL, app.ioMetrics)
profileOutputWriter.write(SQL_DUR_LABEL, app.durAndCpuMet)
// writeOps are generated in only CSV format
profileOutputWriter.writeCSVTable(ProfWriteOpsView.getLabel, app.writeOpsInfo)
val skewHeader = TASK_SHUFFLE_SKEW
val skewTableDesc = AGG_DESCRIPTION(TASK_SHUFFLE_SKEW)
profileOutputWriter.write(skewHeader, app.skewInfo, tableDesc = Some(skewTableDesc))
Expand Down
Loading