From 346ce1d83a206c5938853db8f4b92bd2f2cdb4db Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 21 Feb 2025 22:22:21 +0800 Subject: [PATCH] fix iceberg predicate pushdown --- e2e_test/iceberg/test_case/iceberg_engine.slt | 26 +++ .../batch/batch_iceberg_predicate_pushdown.rs | 169 ++++++++++-------- 2 files changed, 118 insertions(+), 77 deletions(-) diff --git a/e2e_test/iceberg/test_case/iceberg_engine.slt b/e2e_test/iceberg/test_case/iceberg_engine.slt index 1856a39b26f59..548526e5b3db3 100644 --- a/e2e_test/iceberg/test_case/iceberg_engine.slt +++ b/e2e_test/iceberg/test_case/iceberg_engine.slt @@ -136,6 +136,32 @@ select count(_row_id) from t_without_pk; statement ok DROP TABLE t_without_pk +# test predicate pushdown, fix https://github.com/risingwavelabs/risingwave/issues/20462 +statement ok +CREATE TABLE price ( + id BIGINT, + price NUMERIC, + date DATE +) with(commit_checkpoint_interval = 1) ENGINE = ICEBERG; + +statement ok +INSERT INTO price (id, price, date) VALUES +(1, 100.50, '2025-02-01'), +(2, 200.75, '2025-02-05'), +(3, 150.25, '2025-02-10'); + +statement ok +FLUSH; + +query ??? rowsort +SELECT id, price, date FROM price WHERE date <= DATE '2025-02-10' - interval '5' day; +---- +1 100.5000000000 2025-02-01 +2 200.7500000000 2025-02-05 + +statement ok +DROP TABLE price; + statement ok DROP CONNECTION my_conn; diff --git a/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs b/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs index 567f82768f23e..c0d73d8165e14 100644 --- a/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs +++ b/src/frontend/src/optimizer/rule/batch/batch_iceberg_predicate_pushdown.rs @@ -23,7 +23,7 @@ use iceberg::spec::Datum as IcebergDatum; use risingwave_common::catalog::Field; use risingwave_common::types::{Decimal, ScalarImpl}; -use crate::expr::{ExprImpl, ExprType, Literal}; +use crate::expr::{Expr, ExprImpl, ExprType, Literal}; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary}; use crate::optimizer::rule::{BoxedRule, Rule}; @@ -84,8 +84,8 @@ fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option { }; Some(datum) } - ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_nanos( - t.0.and_utc().timestamp_nanos_opt()?, + ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros( + t.0.and_utc().timestamp_micros(), )), ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())), ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)), @@ -123,86 +123,98 @@ fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option match [&args[0], &args[1]] { - [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] - | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.equal_to(datum)) - } - _ => None, - }, - ExprType::NotEqual => match [&args[0], &args[1]] { - [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] - | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.not_equal_to(datum)) - } - _ => None, - }, - ExprType::GreaterThan => match [&args[0], &args[1]] { - [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.greater_than(datum)) - } - [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.less_than_or_equal_to(datum)) - } - _ => None, - }, - ExprType::GreaterThanOrEqual => match [&args[0], &args[1]] { - [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.greater_than_or_equal_to(datum)) + ExprType::Equal if args[0].return_type() == args[1].return_type() => { + match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] + | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.equal_to(datum)) + } + _ => None, } - [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.less_than(datum)) + } + ExprType::NotEqual if args[0].return_type() == args[1].return_type() => { + match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] + | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.not_equal_to(datum)) + } + _ => None, } - _ => None, - }, - ExprType::LessThan => match [&args[0], &args[1]] { - [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.less_than(datum)) + } + ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => { + match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than_or_equal_to(datum)) + } + _ => None, } - [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.greater_than_or_equal_to(datum)) + } + ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => { + match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than_or_equal_to(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than(datum)) + } + _ => None, } - _ => None, - }, - ExprType::LessThanOrEqual => match [&args[0], &args[1]] { - [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.less_than_or_equal_to(datum)) + } + ExprType::LessThan if args[0].return_type() == args[1].return_type() => { + match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than_or_equal_to(datum)) + } + _ => None, } - [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { - let column_name = &fields[lhs.index].name; - let reference = Reference::new(column_name); - let datum = rw_literal_to_iceberg_datum(rhs)?; - Some(reference.greater_than(datum)) + } + ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => { + match [&args[0], &args[1]] { + [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.less_than_or_equal_to(datum)) + } + [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => { + let column_name = &fields[lhs.index].name; + let reference = Reference::new(column_name); + let datum = rw_literal_to_iceberg_datum(rhs)?; + Some(reference.greater_than(datum)) + } + _ => None, } - _ => None, - }, + } ExprType::IsNull => match &args[0] { ExprImpl::InputRef(lhs) => { let column_name = &fields[lhs.index].name; @@ -225,6 +237,9 @@ fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option