Skip to content

Commit

Permalink
clarify parse comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
amahussein committed Feb 19, 2025
1 parent ae63448 commit b8332f6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,18 @@ object DataWritingCommandExecParser {
* `InsertIntoHadoopFsRelationCommand` nodes and extract relevant details
* such as the output path, data format, write mode, catalog information,
* and output columns.
* An example of the pattern is:
* Execute InsertIntoHadoopFsRelationCommand /path/to/warehouse/database/table, false, format,
* [key1=value1, key2=value2], Append, `SparkCatalog`.`database`.`table`, ClassName,
* [outputColumns]
*
* The method performs the following steps:
* - Extracts the output path and data format from the node description.
* - Determines the write mode (e.g., Append, Overwrite) based on specific keywords in the
* Extracts the output path and data format from the node description.
* Determines the write mode (e.g., Append, Overwrite) based on specific keywords in the
* description.
* - Extracts catalog information (database and table name) from the output path.
* - Extracts the output columns if available in the description.
* - Builds and returns a `WriteOperationMetadataTrait` object encapsulating the extracted
* Extracts catalog information (database and table name) from the output path.
* Extracts the output columns if available in the description.
* Builds and returns a `WriteOperationMetadataTrait` object encapsulating the extracted
* metadata.
*
* This method includes error handling to ensure graceful fallback to default values
Expand All @@ -208,6 +212,8 @@ object DataWritingCommandExecParser {
// path.
def extractCatalog(path: String): (String, String) = {
try {
// The location path contains the database and the table as the last 2 entries.
// Example: gs:///path/to/warehouse/database/table
// Split the URI into parts by "/"
val pathParts = path.split("/").filter(_.nonEmpty)
if (pathParts.length >= 2) {
Expand All @@ -227,6 +233,12 @@ object DataWritingCommandExecParser {

// Helper function to extract the output path and data format from the node description.
def extractPathAndFormat(args: Array[String]): (String, String) = {
// This method expects the arguments to be nodeDescr.split(",", 3)
// `Execute cmd path/to/warehouse/db/table, false, parquet, [write options],.*`.
// — 1st arg is always the cmd followed by the path.
// — 2nd arg is boolean argument that we do not care about.
// — 3rd arg is either the format, or the list of write options.

// Extract the path from the first argument
val path =
args.headOption.map(_.split("\\s+").last.trim).getOrElse(StringUtils.UNKNOWN_EXTRACT)
Expand Down Expand Up @@ -258,7 +270,9 @@ object DataWritingCommandExecParser {

// Helper function to extract output columns from the node description.
def extractOutputColumns(description: String): Option[String] = {
// Use a regular expression to find column definitions enclosed in square brackets
// The output columns is found as the last sequence inside a bracket. This method, uses a
// regex to match on string values inside a bracket. Then it picks the last one.
// Use a regular expression to find column definitions enclosed in square brackets.
val columnsRegex = """\[(.*?)\]""".r
columnsRegex.findAllMatchIn(description).map(_.group(1)).toList.lastOption
.map(_.replaceAll(",\\s+", ";")) // Replace commas with semicolons for better readability
Expand Down Expand Up @@ -328,14 +342,14 @@ object DataWritingCommandExecParser {
if (cmdWrapper.execName == DataWritingCommandExecParser.insertIntoHadoopCMD) {
extractWriteOpRecord(cmdWrapper.execName, node.desc)
} else {
// For other commands, build metadata using the command wrapper's information.
// For other commands, build metadata using the command wrapper information.
WriteOperationMetaBuilder.build(
execName = cmdWrapper.execName,
dataFormat = cmdWrapper.dataFormat,
fullDescr = Some(node.desc))
}
case _ =>
// If no command wrapper is found, build metadata with minimal information.
// No command wrapper is found, build metadata with minimal information.
WriteOperationMetaBuilder.buildNoMeta(Some(node.desc))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class WriteOperationParserSuite extends FunSuite {
desc = "Execute InsertIntoHadoopFsRelationCommand gs://path/to/database/table1, " +
"false, Parquet, " +
"[serialization.format=1, mergeschema=false, __hive_compatible_bucketed_table_insertion__=true], " +
"Append, spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"Append, `spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex(gs://path/to/database/table1), " +
"[col01, col02, col03]",
Seq.empty
Expand Down Expand Up @@ -158,7 +158,7 @@ class WriteOperationParserSuite extends FunSuite {
desc = "Execute InsertIntoHadoopFsRelationCommand gs://path/to/database/table1, " +
"false, Parquet, " +
"[serialization.format=1, mergeschema=false, __hive_compatible_bucketed_table_insertion__=true], " +
"Append, spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"Append, `spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex(gs://path/to/database/table1), " +
"[]",
Seq.empty
Expand All @@ -182,7 +182,7 @@ class WriteOperationParserSuite extends FunSuite {
desc = "Execute InsertIntoHadoopFsRelationCommand gs://path/to/database/table1, " +
"false, [paths=(path)], Parquet, " +
"[serialization.format=1, mergeschema=false, __hive_compatible_bucketed_table_insertion__=true], " +
"Append, spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"Append, `spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex(gs://path/to/database/table1), " +
"[col01]",
Seq.empty
Expand Down

0 comments on commit b8332f6

Please sign in to comment.