Skip to content

Commit

Permalink
Generate a detailed report for the write ops (#1544)
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

Fixes #1536

This commits adds a new report containing the write operations.
The generated report is mostly functional for
`InsertIntoHadoopFsRelationCommand` and more incremental improvements
need to follow.

- New report:

No text format is generated for this report. The reason is that it is
not readable to have the path and the node description into Text and it
will represent unecessary overhead

The generated report is called `write_operations.csv` and it looks like
the following:

- `sqlPlanVersion` the number of the plan version. This is helpful if we
want to generate the write operations for all the plans when AQE is
enabled.
- `fromFinalPlan`: True when the plan is final.
- If fields are not extracted, it will insert "`unknown`"
- the `fullDescription` contains the actual node-secr truncated at 500
characters.
- `outputColumns`: lists the columns separated by semicolumn. In case of
truncated schema, the column will include something like `... and 24
more fields`.


---------

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
amahussein authored Feb 19, 2025
1 parent ba84588 commit 78cab00
Show file tree
Hide file tree
Showing 21 changed files with 947 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@ package com.nvidia.spark.rapids.tool.planparser
import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.store.{WriteOperationMetaBuilder, WriteOperationMetadataTrait}
import org.apache.spark.sql.rapids.tool.util.StringUtils

case class DataWritingCommandExecParser(
node: SparkPlanGraphNode,
Expand Down Expand Up @@ -106,13 +108,13 @@ object DataWritingCommandExecParser {
// This used for expressions that do not show the format as part of the description.
private val specialWriteFormatMap = Map[String, String](
// if appendDataExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
appendDataExecV1 -> "unknown",
appendDataExecV1 -> StringUtils.UNKNOWN_EXTRACT,
// if overwriteByExprExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
overwriteByExprExecV1 -> "unknown",
overwriteByExprExecV1 -> StringUtils.UNKNOWN_EXTRACT,
// if atomicReplaceTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicReplaceTableExec -> "unknown",
atomicReplaceTableExec -> StringUtils.UNKNOWN_EXTRACT,
// if atomicCreateTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicCreateTableExec -> "unknown"
atomicCreateTableExec -> StringUtils.UNKNOWN_EXTRACT
)

// Checks whether a node is a write CMD Exec
Expand Down Expand Up @@ -175,4 +177,180 @@ object DataWritingCommandExecParser {
parsedString.split(",")(0) // return third parameter from the input string
}
}

/**
* Extracts metadata information from a write operation node description.
* This method is specifically designed to parse the description of
* `InsertIntoHadoopFsRelationCommand` nodes and extract relevant details
* such as the output path, data format, write mode, catalog information,
* and output columns.
* An example of the pattern is:
* Execute InsertIntoHadoopFsRelationCommand /path/to/warehouse/database/table, false, format,
* [key1=value1, key2=value2], Append, `SparkCatalog`.`database`.`table`, ClassName,
* [outputColumns]
*
* The method performs the following steps:
* — Extracts the output path and data format from the node description.
* — Determines the write mode (e.g., Append, Overwrite) based on specific keywords in the
* description.
* — Extracts catalog information (database and table name) from the output path.
* — Extracts the output columns if available in the description.
* — Builds and returns a `WriteOperationMetadataTrait` object encapsulating the extracted
* metadata.
*
* This method includes error handling to ensure graceful fallback to default values
* (e.g., `UNKNOWN_EXTRACT`) in case of unexpected input or parsing errors.
*
* @param execName The name of the execution command (e.g., `InsertIntoHadoopFsRelationCommand`).
* @param nodeDescr The description of the node, typically containing details about the write
* operation.
* @return A `WriteOperationMetadataTrait` object containing the extracted metadata.
*/
private def extractWriteOpRecord(
execName: String, nodeDescr: String): WriteOperationMetadataTrait = {
// Helper function to extract catalog information (database and table name) from the output
// path.
def extractCatalog(path: String): (String, String) = {
try {
// The location path contains the database and the table as the last 2 entries.
// Example: gs:///path/to/warehouse/database/table
// Split the URI into parts by "/"
val pathParts = path.split("/").filter(_.nonEmpty)
if (pathParts.length >= 2) {
// Extract the last two parts as database and table name
val database = pathParts(pathParts.length - 2)
val tableName = pathParts.last
(database, tableName)
} else {
// If not enough parts, return UNKNOWN_EXTRACT
(StringUtils.UNKNOWN_EXTRACT, StringUtils.UNKNOWN_EXTRACT)
}
} catch {
// Handle any unexpected errors gracefully
case _: Exception => (StringUtils.UNKNOWN_EXTRACT, StringUtils.UNKNOWN_EXTRACT)
}
}

// Helper function to extract the output path and data format from the node description.
def extractPathAndFormat(args: Array[String]): (String, String) = {
// This method expects the arguments to be nodeDescr.split(",", 3)
// `Execute cmd path/to/warehouse/db/table, false, parquet, [write options],.*`.
// — 1st arg is always the cmd followed by the path.
// — 2nd arg is boolean argument that we do not care about.
// — 3rd arg is either the format, or the list of write options.

// Extract the path from the first argument
val path =
args.headOption.map(_.split("\\s+").last.trim).getOrElse(StringUtils.UNKNOWN_EXTRACT)
// Extract the data format from the third argument
val thirdArg = args.lift(2).getOrElse("").trim
val format = if (thirdArg.startsWith("[")) {
// Optional parameter is present in the eventlog. Get the fourth parameter by skipping the
// optional parameter string.
thirdArg.split("(?<=],)")
.map(_.trim).lift(1).getOrElse("").split(",").headOption.getOrElse("").trim
} else {
thirdArg.split(",").headOption.getOrElse("").trim
}
(path, format)
}

// Helper function to determine the write mode (e.g., Append, Overwrite) from the description.
def extractWriteMode(description: String): String = {
val modes = Map(
", Append," -> "Append",
", Overwrite," -> "Overwrite",
", ErrorIfExists," -> "ErrorIfExists",
", Ignore," -> "Ignore"
)
// Match the description against known write modes
modes.collectFirst { case (key, mode) if description.contains(key) => mode }
.getOrElse(StringUtils.UNKNOWN_EXTRACT)
}

// Helper function to extract output columns from the node description.
def extractOutputColumns(description: String): Option[String] = {
// The output columns is found as the last sequence inside a bracket. This method, uses a
// regex to match on string values inside a bracket. Then it picks the last one.
// Use a regular expression to find column definitions enclosed in square brackets.
val columnsRegex = """\[(.*?)\]""".r
columnsRegex.findAllMatchIn(description).map(_.group(1)).toList.lastOption
.map(_.replaceAll(",\\s+", ";")) // Replace commas with semicolons for better readability
}

// Parse the node description into arguments
val splitArgs = nodeDescr.split(",", 3)

// Extract the output path and data format
val (path, format) = extractPathAndFormat(splitArgs)

// Extract the write mode (e.g., Append, Overwrite)
val writeMode = extractWriteMode(nodeDescr)

// Extract catalog information (database and table name) from the output path
val (catalogDB, catalogTable) = extractCatalog(path)

// Extract the output columns, if available
val outColumns = extractOutputColumns(nodeDescr)

// Build and return the metadata object encapsulating all extracted information
WriteOperationMetaBuilder.build(
execName = execName,
dataFormat = format,
outputPath = Option(path),
outputColumns = outColumns,
writeMode = writeMode,
tableName = catalogTable,
dataBaseName = catalogDB,
fullDescr = Some(nodeDescr)
)
}

/**
* Extracts metadata information from a given SparkPlanGraphNode representing a write operation.
*
* This method determines the type of write operation (e.g., Delta Lake or other supported
* commands) and extracts relevant metadata such as execution name, data format, output path,
* write mode, and catalog information. It uses helper methods to parse the node description
* and build a metadata object encapsulating the extracted details.
*
* The method performs the following steps:
* 1. Retrieves the node description from the provided SparkPlanGraphNode.
* 2. Checks if the node is a Delta Lake write operation using DeltaLakeHelper.
* 3. Retrieves the appropriate command wrapper (logical or physical) for the node.
* 4. If the command is `InsertIntoHadoopFsRelationCommand`, it invokes a specialized method
* `extractWriteOpRecord` to extract detailed metadata.
* 5. For other commands, it builds a metadata object using the command wrapper's information.
* 6. If no command wrapper is found, it falls back to building a metadata object with minimal
* information.
*
* @param node The SparkPlanGraphNode representing the write operation.
* @return A WriteOperationMetadataTrait object containing the extracted metadata.
*/
def getWriteOpMetaFromNode(node: SparkPlanGraphNode): WriteOperationMetadataTrait = {
// Determine the appropriate command wrapper based on whether the node is a Delta Lake write
// operation.
val cmdWrapper = if (DeltaLakeHelper.acceptsWriteOp(node)) {
DeltaLakeHelper.getWriteCMDWrapper(node)
} else {
getWriteCMDWrapper(node)
}
// Process the command wrapper to extract metadata
cmdWrapper match {
case Some(cmdWrapper) =>
// If the command is InsertIntoHadoopFsRelationCommand, extract detailed metadata.
if (cmdWrapper.execName == DataWritingCommandExecParser.insertIntoHadoopCMD) {
extractWriteOpRecord(cmdWrapper.execName, node.desc)
} else {
// For other commands, build metadata using the command wrapper information.
WriteOperationMetaBuilder.build(
execName = cmdWrapper.execName,
dataFormat = cmdWrapper.dataFormat,
fullDescr = Some(node.desc))
}
case _ =>
// No command wrapper is found, build metadata with minimal information.
WriteOperationMetaBuilder.buildNoMeta(Some(node.desc))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.{SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.SqlPlanInfoGraphEntry
import org.apache.spark.sql.rapids.tool.util.StringUtils

// A class used to handle the DL writeOps such as:
// - AppendDataExecV1
Expand Down Expand Up @@ -135,8 +136,6 @@ object DeltaLakeHelper {
def parseNode(node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long): ExecInfo = {
val opExec = new DLWriteWithFormatAndSchemaParser(node, checker, sqlID)
opExec.parse
node match {
case n if acceptsWriteOp(n) =>
val opExec = new DLWriteWithFormatAndSchemaParser(node, checker, sqlID)
Expand All @@ -145,6 +144,25 @@ object DeltaLakeHelper {
}
}

/**
* Get the write command wrapper for the given node deltaLake exec node.
* This method should be called only if the node passes the `acceptsWriteOp` check.
* @param node the deltaLake write exec
* @return the write command wrapper
*/
def getWriteCMDWrapper(node: SparkPlanGraphNode): Option[DataWritingCmdWrapper] = {
val wcmd = exclusiveDeltaExecs.find(node.name.contains(_)) match {
case Some(cmd) => cmd
case _ =>
deltaExecsFromSpark.find(node.name.contains(_)) match {
case Some(cmd) => cmd
case _ => StringUtils.UNKNOWN_EXTRACT
}
}
// The format must be delta
Some(DataWritingCmdWrapper(wcmd, DataWritingCommandExecParser.dataWriteCMD, getWriteFormat))
}

// Kept for future use if we find that SerDe library can be used to deduce any information to
// reflect on the support of the Op
def getSerdeLibrary(nodeDesc: String): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.EventUtils
import org.apache.spark.sql.rapids.tool.util.{EventUtils, StringUtils}

// A wrapper class to map between
case class HiveScanSerdeClasses(className: String, format: String) extends Logging {
Expand Down Expand Up @@ -68,7 +68,7 @@ object HiveParseHelper extends Logging {
}

def getHiveFormatFromSimpleStr(str: String): String = {
LOADED_SERDE_CLASSES.find(_.accepts(str)).map(_.format).getOrElse("unknown")
LOADED_SERDE_CLASSES.find(_.accepts(str)).map(_.format).getOrElse(StringUtils.UNKNOWN_EXTRACT)
}

// Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.StringUtils

case class ReadMetaData(schema: String, location: String, format: String,
tags: Map[String, String] = ReadParser.DEFAULT_METAFIELD_MAP) {
Expand Down Expand Up @@ -60,7 +61,7 @@ object ReadParser extends Logging {
val METAFIELD_TAG_FORMAT = "Format"
val METAFIELD_TAG_LOCATION = "Location"

val UNKNOWN_METAFIELD: String = "unknown"
val UNKNOWN_METAFIELD: String = StringUtils.UNKNOWN_EXTRACT
val DEFAULT_METAFIELD_MAP: Map[String, String] = collection.immutable.Map(
METAFIELD_TAG_DATA_FILTERS -> UNKNOWN_METAFIELD,
METAFIELD_TAG_PUSHED_FILTERS -> UNKNOWN_METAFIELD,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.rapids.tool.util.StringUtils

class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait {
// Preformatted values for CSV output to avoid reformatting multiple times.
val csvValue: String = StringUtils.reformatCSVString(value)
val csvOpType: String = StringUtils.reformatCSVString(opType.toString)
lazy val csvValue: String = StringUtils.reformatCSVString(value)
lazy val csvOpType: String = StringUtils.reformatCSVString(opType.toString)

override def getOpName: String = value
override def getOpNameCSV: String = csvValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.AppSummaryInfoBaseProvider
import com.nvidia.spark.rapids.tool.views.WriteOpProfileResult

case class ApplicationSummaryInfo(
appInfo: Seq[AppInfoProfileResults],
Expand Down Expand Up @@ -47,7 +48,8 @@ case class ApplicationSummaryInfo(
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent])
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
writeOpsInfo: Seq[WriteOpProfileResult])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfDataSourceView.getRawView(apps, cachedSqlAccum)
}

// get the write records information
def getWriteOperationInfo: Seq[WriteOpProfileResult] = {
ProfWriteOpsView.getRawView(apps)
}

// get executor related information
def getExecutorInfo: Seq[ExecutorInfoProfileResult] = {
ProfExecutorView.getRawView(apps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,15 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo,
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo),
compareRes, DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics))
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo,
collect.getWriteOperationInfo),
compareRes,
DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics))
}

/**
Expand Down Expand Up @@ -502,7 +504,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex),
combineProps("system", appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex),
appsSum.flatMap(_.sparkRapidsBuildInfo)
appsSum.flatMap(_.sparkRapidsBuildInfo),
appsSum.flatMap(_.writeOpsInfo).sortBy(_.appIndex)
)
Seq(reduced)
} else {
Expand Down Expand Up @@ -546,6 +549,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some(AGG_DESCRIPTION(SQL_AGG_LABEL)))
profileOutputWriter.write(IO_LABEL, app.ioMetrics)
profileOutputWriter.write(SQL_DUR_LABEL, app.durAndCpuMet)
// writeOps are generated in only CSV format
profileOutputWriter.writeCSVTable(ProfWriteOpsView.getLabel, app.writeOpsInfo)
val skewHeader = TASK_SHUFFLE_SKEW
val skewTableDesc = AGG_DESCRIPTION(TASK_SHUFFLE_SKEW)
profileOutputWriter.write(skewHeader, app.skewInfo, tableDesc = Some(skewTableDesc))
Expand Down
Loading

0 comments on commit 78cab00

Please sign in to comment.