Skip to content

Commit

Permalink
Prepare for review
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Dec 28, 2023
1 parent 709e44e commit aafd0b0
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ case class ValueSetSkippingStrategy(

override def getAggregators: Seq[Expression] = {
val limit = DEFAULT_VALUE_SET_SIZE_LIMIT
val collectSetLimit = collect_set(columnName)
val collectSet = collect_set(columnName)
val aggregator =
when(size(collectSetLimit) > limit, lit(null))
.otherwise(collectSetLimit)
when(size(collectSet) > limit, lit(null))
.otherwise(collectSet)
Seq(aggregator.expr)
}

Expand All @@ -41,7 +41,7 @@ case class ValueSetSkippingStrategy(
predicate match {
case EqualTo(AttributeReference(`columnName`, _, _, _), value: Literal) =>
// Value set maybe null due to maximum size limit restriction
Some((col(columnName) === value || isnull(col(columnName))).expr)
Some((isnull(col(columnName)) || col(columnName) === value).expr)
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ValueSetSkippingStrategySuite

test("should rewrite EqualTo(<indexCol>, <value>)") {
EqualTo(name, Literal("hello")) shouldRewriteTo
(col("name") === "hello" || isnull(col("name")))
(isnull(col("name")) || col("name") === "hello")
}

test("should not rewrite predicate with other column") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023 && col("month") === 4))
}

test("can build value set skipping index and rewrite applicable query") {
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("address")
.create()
flint.refreshIndex(testIndex, FULL)

val query = sql(s"""
| SELECT name
| FROM $testTable
| WHERE address = 'Portland'
|""".stripMargin)

checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("address") === "Portland" || isnull(col("address"))))
}

test("can build value set skipping index up to default limit") {
test("can build value set skipping index up to limit and rewrite applicable query") {
val testTable2 = "spark_catalog.default.value_set_test"
val testIndex2 = getSkippingIndexName(testTable2)
val defaultLimit = ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
Expand All @@ -301,29 +281,40 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| PARTITION (year=2023, month=4)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 30, 'Seattle'),
| ('World', 40, 'Portland')
| ('Hello', 20, 'Seattle'),
| ('World', 30, 'Portland')
|""".stripMargin)
sql(s"""
| INSERT OVERWRITE $testTable2
| PARTITION (year=2023, month=5)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 30, 'Seattle'),
| ('World', 40, 'Portland'),
| ('Test', 50, 'Vancouver')
| ('World', 50, 'Portland'),
| ('Test', 60, 'Vancouver')
|""".stripMargin)

// Build value set with maximum size 2
flint
.skippingIndex()
.onTable(testTable2)
.addValueSet("address")
.create()
flint.refreshIndex(testIndex2, FULL)

checkAnswer(
flint.queryIndex(testIndex2).select("address"),
Seq(Row("""["Seattle","Portland"]"""), Row(null)))

// Rewrite query and work with value set (maybe null)
val query = sql(s"""
| SELECT age
| FROM $testTable2
| WHERE address = 'Portland'
|""".stripMargin)
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(isnull(col("address")) || col("address") === "Portland"))
checkAnswer(query, Seq(Row(30), Row(50)))
} finally {
ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = defaultLimit
flint.deleteIndex(testIndex2)
Expand Down Expand Up @@ -663,8 +654,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
checkAnswer(query, Row("sample varchar", paddedChar))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter((col("varchar_col") === "sample varchar" || isnull(col("varchar_col"))) &&
(col("char_col") === paddedChar || isnull(col("char_col")))))
hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") &&
(isnull(col("char_col")) || col("char_col") === paddedChar)))

flint.deleteIndex(testIndex)
}
Expand Down

0 comments on commit aafd0b0

Please sign in to comment.