Skip to content

Commit

Permalink
Add Join and Lookup IT on OpenSearch Table (#1025)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Feb 5, 2025
1 parent 98579e1 commit 785d02b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ val df = spark.sql("SELECT * FROM dev.default.`my_index*`")
df.show()
```

Join two indices
```scala
val df = spark.sql("SELECT * FROM dev.default.my_index as t1 JOIN dev.default.my_index as t2 ON t1.id == t2.id")
df.show()
```

## Limitation
### catalog operation
- List Tables: Not supported.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.table

import org.opensearch.flint.spark.ppl.FlintPPLSuite

import org.apache.spark.sql.Row

/**
* Test queries on OpenSearch Table
*/
class OpenSearchTableQueryITSuite extends OpenSearchCatalogSuite with FlintPPLSuite {
test("SQL Join two indices") {
val indexName1 = "t0001"
val indexName2 = "t0002"
withIndexName(indexName1) {
withIndexName(indexName2) {
simpleIndex(indexName1)
simpleIndex(indexName2)
val df = spark.sql(s"""
SELECT t1.accountId, t2.eventName, t2.eventSource
FROM ${catalogName}.default.$indexName1 as t1 JOIN ${catalogName}.default.$indexName2 as t2 ON
t1.accountId == t2.accountId""")

checkAnswer(df, Row("123", "event", "source"))
}
}
}

test("PPL Lookup") {
val indexName1 = "t0001"
val indexName2 = "t0002"
val factTbl = s"${catalogName}.default.$indexName1"
val lookupTbl = s"${catalogName}.default.$indexName2"
withIndexName(indexName1) {
withIndexName(indexName2) {
simpleIndex(indexName1)
simpleIndex(indexName2)

val df = spark.sql(
s"source = $factTbl | stats count() by accountId " +
s"| LOOKUP $lookupTbl accountId REPLACE eventSource")
checkAnswer(df, Row(1, "123", "source"))
}
}
}
}

0 comments on commit 785d02b

Please sign in to comment.