-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Fix the PPL Lookup command behavior when inputField is missing and REPLACE with existing fields #1035
Conversation
…and REPLACE with existing fields Signed-off-by: Lantao Jin <ltjin@amazon.com>
@@ -16,7 +16,7 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { | |||
|
|||
override def apply(extensions: SparkSessionExtensions): Unit = { | |||
extensions.injectParser { (spark, parser) => | |||
new FlintSparkPPLParser(parser) | |||
new FlintSparkPPLParser(parser, spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SparkSession in using is passed in from here.
boolean sourceTableExists = spark.sessionState().catalog().tableExists(sourceTableIdentifier); | ||
if (sourceTableExists) { | ||
try { | ||
CatalogTable table = spark.sessionState().catalog().getTableMetadata(getTableIdentifier(tableQualifiedName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a just a metadata call via getting external catalog from the SparkSession passed in. No additional spark job will be submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense doing this metadata discovery more generic so that all relations have this info ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes if we see more requirements later. Now I am still willing to leave the metadata binding to Spark internal if not necessary.
@@ -429,4 +415,32 @@ class FlintSparkPPLLookupITSuite | |||
|
|||
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) | |||
} | |||
|
|||
test("test LOOKUP lookupTable name") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectedResults should only have one occupation columns, but it has 2 occupation columns now.
IMO, The lookup command enriches your fact table by pulling in columns from a lookup (dimension) table. If the same column name appears in both tables, you can choose one of two conflict-resolution strategies:
- REPLACE overwrites values in the source column with values from the lookup table.
- APPEND only writes values to the source column if the column is currently missing or null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me double-check the behaviour of SPL. My understand is keeping 2 occupation
columns because we don't know what strategies for command | LOOKUP lookupTable name
.
When using the lookup command, if an OUTPUT or OUTPUTNEW clause is not specified, all of the fields in the lookup table that are not the match fields are used as output fields. If the OUTPUT clause is specified, the output lookup fields overwrite existing fields. If the OUTPUTNEW clause is specified, the lookup is not performed for events in which the output fields already exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OUTPUT is default option?
| LOOKUP lookupTable name
equivalent to | LOOKUP lookupTable name REPLACE dimensionTable.*
. columns conflict-resolution should be applied.
btw, if the output include 2 occupation columns, are these naming conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, the occupation
field from Source should be replaced with occupation
field from Lookup if there are duplicated columns. Double-confirmed via SPL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But achieving this goal may bring more complexity than expected, because what is being duplicated is not the field names of the lookup table and the source table, but the field names of the lookup table and the output of the source plan. But our current implementation is to pass an unresolved logical plan to Spark, and at this time we have not obtained the output of the source plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@penghuo , I have fixed to address your comment in 2e954b2. Here is new behaivour:
source
id | col1 | col2 |
---|---|---|
1 | a | b |
2 | aa | bb |
lookup
id | col1 | col3 |
---|---|---|
1 | x | y |
3 | xx | yy |
index=source | lookup id
will output
id | col1 | col2 | col3 |
---|---|---|---|
1 | x | b | y |
2 | null | bb | null |
Notice the value of col1 for row 2 is null
instead of aa
, this behavior produces the same result as SPL's by my testing (Although it looks a bit strange).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, the behavior is strange. but let's align with it.
Update my undestanding with lookup with SPL test.
The lookup command enriches your fact table by performing a left join with a lookup (dimension) table. Specifically, it:
- Joins the fact table with the lookup table based on a join key (lookupMappingField). By default, the command appends all columns from the lookup table (except the join key) to the fact table. Alternatively, you can explicitly specify the desired output fields.
- Resolves column name conflicts between the two tables using one of two strategies:
- REPLACE: The lookup table’s value replaces the fact table’s value.
- APPEND: The lookup table’s value is used only if the corresponding fact table column is missing or null.
One highligh is,
- fact table
id | col1 | col2 |
---|---|---|
1 | a | b |
2 | aa | bb |
3 | null | ccc |
- dimension table
id | col1 | col3 |
---|---|---|
1 | x | y |
3 | xx | yy |
if source="fact.csv" | lookup "dim.csv" id OUTPUT id, col1, col3
, fact table id column is replaced with dimension id column, the result is,
id | col1 | col2 | col3 |
---|---|---|---|
1 | x | b | y |
null | null | bb | null |
3 | xx | ccc | yy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output of value null for field id
and col1
of row 2 really surprised me. In our implementation, only the matched rows (explicitly joined row) will be applied the output strategies (whatever replace or append). So the output of source="fact.csv" | lookup "dim.csv" id REPLACE id, col1, col3
is,
id | col1 | col2 | col3 |
---|---|---|---|
1 | x | b | y |
2 | aa | bb | null |
3 | xx | ccc | yy |
But seems in Splunk, whatever a row is matched or not, the conflicted column from source will be replaced to new value from lookup. @penghuo will we refactor to align with Splunk's output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aligned all behaviours in 42eb9e0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is not intuitive, but let's align with SPL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from a minor question - LGTM
Signed-off-by: Lantao Jin <ltjin@amazon.com>
// the matched values of `col1` from lookup table will replace to the values of `col1` from source. | ||
Set<Field> intersection = new HashSet<>(RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), lookupTable)); | ||
LOG.debug("The fields list in lookup table are {}", intersection); | ||
List<Field> sourceOutput = RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), searchSide); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if searchSide
is not a table and its output has diffed from the original fields of catalog relation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I am thinking how to resolve the output of plan without submitting spark job. Any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't need to fallback to source value when mismatching, a simple idea is ignoring the output fields of searchSide
, it won't throw exception although we want to drop the columns not exist in the output of a plan.
Otherwise, it seems we always need an analyzed plan for handling the logic of fallback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, why did I initially get stuck on trying to obtain the attributes of the source table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @qianheng-aws suggestion on ignoring the output fields of searchSide
Found another new issue on lookup command. Below PPL got exception:
But it should be supported as said in the doc:
|
Good catching! It's because the current implementation |
// lookup mapping keys are not concerned to drop here, it will be checked later. | ||
intersection.removeAll(mappingFieldsOfLookup); | ||
intersection.retainAll(sourceOutput); | ||
List<Expression> duplicated = buildProjectListFromFields(new ArrayList<>(intersection), expressionAnalyzer, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we drop the duplicated columns directly, then its behavior is different with what we do with explicit replacement here:
Lines 101 to 102 in 785d02b
// The result output project list we build here is used to replace the source output, | |
// for the unmatched rows of left outer join, the outputs are null, so fall back to source output. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's I said it is strange in SPL, see #1035 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure is it a bug of SPL or just the designed behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we follow that behavior, I think it will make the logic simpler. Then we need to change the logic in buildOutputProjectList
as well to align.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we follow that behavior, I think it will make the logic simpler. Then we need to change the logic in
buildOutputProjectList
as well to align.
No, this strange behavior only happens for the duplicated fields when REPLACE/APPEND clause is missing (that is why I call it strange). The fall back to source output
behavior still existing in REPLACE/APPEND clause.
Signed-off-by: Lantao Jin <ltjin@amazon.com>
One option (the latest commit) is to keep the current query failure behaviour to simply the logic. I have updated the doc description. |
Signed-off-by: Lantao Jin <ltjin@amazon.com>
Summary: The latest description for
@penghuo @qianheng-aws please take another look. |
LGTM |
@LantaoJin Do u mean APPEND does not work with ( [AS ? If yes, could you refine lookup command doc and add a follow-up issue to track it. |
No, only if AS a new field in APPEND. check the IT file, I have added the unsupported case. Lookup doc had been updated as well. @penghuo Demo with the previous example
an extended new field
Throws with "A column or function parameter with name Actually, as the More complexity, for query
could be rewritten to
|
Open #1040 as a followup. (Since this failure case is a corner case and it could workaround by query rewriting). IMO, It's reasonable to throw exception, so we could treat it nice to have. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx!
Description
issue-1:
The current behaviour of inputField is "If you don't specify any
<inputField>
, all fields of<lookupIndex>
where matched values are applied to result output."The correct behaviour should be "If you don't specify any
<inputField>
, all fields of<lookupIndex>
that are not the mapping fields where matched values are applied to result output."issue-2:
A known bug. The existing IT "test LOOKUP lookupTable name REPLACE occupation" provided a workaround. The PR fixed the issue and provide a new IT "test LOOKUP lookupTable name REPLACE occupation - 2"
Related Issues
Resolves #1026
Check List
--signoff
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.