Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix the PPL Lookup command behavior when inputField is missing and REPLACE with existing fields #1035

Merged
merged 4 commits into from
Feb 10, 2025

Conversation

LantaoJin
Copy link
Member

@LantaoJin LantaoJin commented Feb 5, 2025

Description

issue-1:

The current behaviour of inputField is "If you don't specify any <inputField>, all fields of <lookupIndex> where matched values are applied to result output."

The correct behaviour should be "If you don't specify any <inputField>, all fields of <lookupIndex> that are not the mapping fields where matched values are applied to result output."

issue-2:

A known bug. The existing IT "test LOOKUP lookupTable name REPLACE occupation" provided a workaround. The PR fixed the issue and provide a new IT "test LOOKUP lookupTable name REPLACE occupation - 2"

Related Issues

Resolves #1026

Check List

  • Updated documentation (docs/ppl-lang/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • New added source code should include a copyright header
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…and REPLACE with existing fields

Signed-off-by: Lantao Jin <ltjin@amazon.com>
@@ -16,7 +16,7 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) {

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (spark, parser) =>
new FlintSparkPPLParser(parser)
new FlintSparkPPLParser(parser, spark)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SparkSession in using is passed in from here.

boolean sourceTableExists = spark.sessionState().catalog().tableExists(sourceTableIdentifier);
if (sourceTableExists) {
try {
CatalogTable table = spark.sessionState().catalog().getTableMetadata(getTableIdentifier(tableQualifiedName));
Copy link
Member Author

@LantaoJin LantaoJin Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a just a metadata call via getting external catalog from the SparkSession passed in. No additional spark job will be submitted.

Copy link
Member

@YANG-DB YANG-DB Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense doing this metadata discovery more generic so that all relations have this info ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes if we see more requirements later. Now I am still willing to leave the metadata binding to Spark internal if not necessary.

@@ -429,4 +415,32 @@ class FlintSparkPPLLookupITSuite

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test LOOKUP lookupTable name") {
Copy link
Collaborator

@penghuo penghuo Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectedResults should only have one occupation columns, but it has 2 occupation columns now.

IMO, The lookup command enriches your fact table by pulling in columns from a lookup (dimension) table. If the same column name appears in both tables, you can choose one of two conflict-resolution strategies:

  • REPLACE overwrites values in the source column with values from the lookup table.
  • APPEND only writes values to the source column if the column is currently missing or null.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double-check the behaviour of SPL. My understand is keeping 2 occupation columns because we don't know what strategies for command | LOOKUP lookupTable name.

When using the lookup command, if an OUTPUT or OUTPUTNEW clause is not specified, all of the fields in the lookup table that are not the match fields are used as output fields. If the OUTPUT clause is specified, the output lookup fields overwrite existing fields. If the OUTPUTNEW clause is specified, the lookup is not performed for events in which the output fields already exist.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OUTPUT is default option?
| LOOKUP lookupTable name equivalent to | LOOKUP lookupTable name REPLACE dimensionTable.*. columns conflict-resolution should be applied.

btw, if the output include 2 occupation columns, are these naming conflict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the occupation field from Source should be replaced with occupation field from Lookup if there are duplicated columns. Double-confirmed via SPL.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But achieving this goal may bring more complexity than expected, because what is being duplicated is not the field names of the lookup table and the source table, but the field names of the lookup table and the output of the source plan. But our current implementation is to pass an unresolved logical plan to Spark, and at this time we have not obtained the output of the source plan.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo , I have fixed to address your comment in 2e954b2. Here is new behaivour:

source

id col1 col2
1 a b
2 aa bb

lookup

id col1 col3
1 x y
3 xx yy
index=source | lookup id

will output

id col1 col2 col3
1 x b y
2 null bb null

Notice the value of col1 for row 2 is null instead of aa, this behavior produces the same result as SPL's by my testing (Although it looks a bit strange).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, the behavior is strange. but let's align with it.

Update my undestanding with lookup with SPL test.

The lookup command enriches your fact table by performing a left join with a lookup (dimension) table. Specifically, it:

  1. Joins the fact table with the lookup table based on a join key (lookupMappingField). By default, the command appends all columns from the lookup table (except the join key) to the fact table. Alternatively, you can explicitly specify the desired output fields.
  2. Resolves column name conflicts between the two tables using one of two strategies:
    • REPLACE: The lookup table’s value replaces the fact table’s value.
    • APPEND: The lookup table’s value is used only if the corresponding fact table column is missing or null.

One highligh is,

  • fact table
id col1 col2
1 a b
2 aa bb
3 null ccc
  • dimension table
id col1 col3
1 x y
3 xx yy

if source="fact.csv" | lookup "dim.csv" id OUTPUT id, col1, col3, fact table id column is replaced with dimension id column, the result is,

id col1 col2 col3
1 x b y
null null bb null
3 xx ccc yy

Copy link
Member Author

@LantaoJin LantaoJin Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output of value null for field id and col1 of row 2 really surprised me. In our implementation, only the matched rows (explicitly joined row) will be applied the output strategies (whatever replace or append). So the output of source="fact.csv" | lookup "dim.csv" id REPLACE id, col1, col3 is,

id col1 col2 col3
1 x b y
2 aa bb null
3 xx ccc yy

But seems in Splunk, whatever a row is matched or not, the conflicted column from source will be replaced to new value from lookup. @penghuo will we refactor to align with Splunk's output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligned all behaviours in 42eb9e0

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is not intuitive, but let's align with SPL.

@penghuo penghuo added the Lang:PPL Pipe Processing Language support label Feb 5, 2025
Copy link
Member

@YANG-DB YANG-DB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from a minor question - LGTM

Signed-off-by: Lantao Jin <ltjin@amazon.com>
// the matched values of `col1` from lookup table will replace to the values of `col1` from source.
Set<Field> intersection = new HashSet<>(RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), lookupTable));
LOG.debug("The fields list in lookup table are {}", intersection);
List<Field> sourceOutput = RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), searchSide);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if searchSide is not a table and its output has diffed from the original fields of catalog relation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I am thinking how to resolve the output of plan without submitting spark job. Any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need to fallback to source value when mismatching, a simple idea is ignoring the output fields of searchSide, it won't throw exception although we want to drop the columns not exist in the output of a plan.

Otherwise, it seems we always need an analyzed plan for handling the logic of fallback.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, why did I initially get stuck on trying to obtain the attributes of the source table?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @qianheng-aws suggestion on ignoring the output fields of searchSide

@qianheng-aws
Copy link
Contributor

qianheng-aws commented Feb 6, 2025

Found another new issue on lookup command. Below PPL got exception:

[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `new_col` cannot be resolved.

source = $sourceTable | LOOKUP $lookupTable name REPLACE occupation as new_col.

But it should be supported as said in the doc:

If you specify <outputField> with an existing field name in source query, its values will be replaced or appended by matched values from <inputField>. If the field specified in <outputField> is a new field, an extended new field will be applied to the results.

@LantaoJin
Copy link
Member Author

LantaoJin commented Feb 6, 2025

Found another new issue on lookup command. Below PPL got exception:

[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `new_col` cannot be resolved.

source = $sourceTable | LOOKUP $lookupTable name REPLACE occupation as new_col.

But it should be supported as said in the doc:

If you specify with an existing field name in source query, its values will be replaced or appended by matched values from . If the field specified in is a new field, an extended new field will be applied to the results.

Good catching! It's because the current implementation Coalesce expression cannot be resolved with a not existed attribute. Maybe we can just add some special handling when we have a way to extract the output of source plan: only extract expressions from last Project and Aggregation

// lookup mapping keys are not concerned to drop here, it will be checked later.
intersection.removeAll(mappingFieldsOfLookup);
intersection.retainAll(sourceOutput);
List<Expression> duplicated = buildProjectListFromFields(new ArrayList<>(intersection), expressionAnalyzer, context)
Copy link
Contributor

@qianheng-aws qianheng-aws Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we drop the duplicated columns directly, then its behavior is different with what we do with explicit replacement here:

// The result output project list we build here is used to replace the source output,
// for the unmatched rows of left outer join, the outputs are null, so fall back to source output.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's I said it is strange in SPL, see #1035 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure is it a bug of SPL or just the designed behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow that behavior, I think it will make the logic simpler. Then we need to change the logic in buildOutputProjectList as well to align.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow that behavior, I think it will make the logic simpler. Then we need to change the logic in buildOutputProjectList as well to align.

No, this strange behavior only happens for the duplicated fields when REPLACE/APPEND clause is missing (that is why I call it strange). The fall back to source output behavior still existing in REPLACE/APPEND clause.

Signed-off-by: Lantao Jin <ltjin@amazon.com>
@LantaoJin
Copy link
Member Author

Found another new issue on lookup command. Below PPL got exception:

[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `new_col` cannot be resolved.

source = $sourceTable | LOOKUP $lookupTable name REPLACE occupation as new_col.
But it should be supported as said in the doc:

If you specify with an existing field name in source query, its values will be replaced or appended by matched values from . If the field specified in is a new field, an extended new field will be applied to the results.

Good catching! It's because the current implementation Coalesce expression cannot be resolved with a not existed attribute. Maybe we can just add some special handling when we have a way to extract the output of source plan: only extract expressions from last Project and Aggregation

One option (the latest commit) is to keep the current query failure behaviour to simply the logic. I have updated the doc description.

Signed-off-by: Lantao Jin <ltjin@amazon.com>
@LantaoJin
Copy link
Member Author

LantaoJin commented Feb 7, 2025

Summary:
Aligned with all behaviours of Splunk lookup command (all ITs have crossing verified), except "Specify outputField to a new field in APPEDN strategy". To resolve this case, we have to extract the output attributes from source plan (not table). To avoid it, the logic keeps throwing exception for now.

The latest description for outputField became to:

If you specify <outputField> with an existing field name in source query, its values will be replaced or appended by matched values from <inputField>. If the field specified in <outputField> is a new field, in REPLACE strategy, an extended new field will be applied to the results, but fail in APPEND strategy.

@penghuo @qianheng-aws please take another look.

@qianheng-aws
Copy link
Contributor

LGTM

@penghuo
Copy link
Collaborator

penghuo commented Feb 7, 2025

Summary: Aligned with all behaviours of Splunk lookup command (all ITs have crossing verified), except "Specify outputField to a new field in APPEDN strategy". To resolve this case, we have to extract the output attributes from source plan (not table). To avoid it, the logic keeps throwing exception for now.

The latest description for outputField became to:

If you specify with an existing field name in source query, its values will be replaced or appended by matched values from . If the field specified in is a new field, in REPLACE strategy, an extended new field will be applied to the results, but fail in APPEND strategy.

@penghuo @qianheng-aws please take another look.

except "Specify outputField to a new field in APPEDN strategy"

@LantaoJin Do u mean APPEND does not work with ( [AS ? If yes, could you refine lookup command doc and add a follow-up issue to track it.

@LantaoJin
Copy link
Member Author

LantaoJin commented Feb 8, 2025

No, only if AS a new field in APPEND. check the IT file, I have added the unsupported case. Lookup doc had been updated as well. @penghuo

Demo with the previous example

Q1: source="fact.csv" | lookup "dim.csv" id REPLACE col1 as colA

an extended new field colA will be applied to the results

id col1 col2 colA
1 a b x
2 aa bb null
3 null ccc xx
Q2: source="fact.csv" | lookup "dim.csv" id APPEND col1 as colA

Throws with "A column or function parameter with name colA cannot be resolved"

Actually, as the colA is a new field which not existed in fact and dim, the Q2 could be rewritten by Q1.

More complexity, for query

source="fact.csv" | lookup "dim.csv" id APPEND col1, col3 as colA

could be rewritten to

source="fact.csv" | lookup "dim.csv" id APPEND col1 | lookup "dim.csv" id REPLACE col3 as colA

@LantaoJin
Copy link
Member Author

LantaoJin commented Feb 8, 2025

Open #1040 as a followup. (Since this failure case is a corner case and it could workaround by query rewriting). IMO, It's reasonable to throw exception, so we could treat it nice to have.

Copy link
Collaborator

@penghuo penghuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx!

@penghuo penghuo merged commit 9925289 into opensearch-project:main Feb 10, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Lang:PPL Pipe Processing Language support
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] The default behavior of PPL Lookup does not work as expected
4 participants