diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 49cde9783b3a..2ce78d38b979 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -76,8 +77,9 @@ impl WindowedSortPhysicalRule { let preserve_partitioning = sort_exec.preserve_partitioning(); - let Some(scanner_info) = fetch_partition_range(sort_exec.input().clone())? - else { + let sort_input = remove_repartition(sort_exec.input().clone())?.data; + // Gets scanner info from the input without repartition before filter. + let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else { return Ok(Transformed::no(plan)); }; @@ -99,13 +101,13 @@ impl WindowedSortPhysicalRule { let new_input = if scanner_info.tag_columns.is_empty() && !first_sort_expr.options.descending { - sort_exec.input().clone() + sort_input } else { Arc::new(PartSortExec::new( first_sort_expr.clone(), sort_exec.fetch(), scanner_info.partition_ranges.clone(), - sort_exec.input().clone(), + sort_input, )) }; @@ -194,3 +196,24 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult, +) -> DataFusionResult>> { + plan.transform_down(|plan| { + if plan.as_any().is::() { + // Checks child. + let maybe_repartition = plan.children()[0]; + if maybe_repartition.as_any().is::() { + let maybe_scan = maybe_repartition.children()[0]; + if maybe_scan.as_any().is::() { + let new_filter = plan.clone().with_new_children(vec![maybe_scan.clone()])?; + return Ok(Transformed::yes(new_filter)); + } + } + } + + Ok(Transformed::no(plan)) + }) +} diff --git a/tests/cases/standalone/common/order/order_by.result b/tests/cases/standalone/common/order/order_by.result index eba412dba013..ad185e642fd5 100644 --- a/tests/cases/standalone/common/order/order_by.result +++ b/tests/cases/standalone/common/order/order_by.result @@ -297,17 +297,17 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED -|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED +|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED +|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| | 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED -|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED +|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED +|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 2_| diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index bf5cabdad816..10613d2f41b9 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -106,6 +106,107 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; |_|_| Total rows: 5_| +-+-+-+ +-- Filter on a field. +SELECT * FROM test where i > 2 ORDER BY t LIMIT 4; + ++---+-------------------------+ +| i | t | ++---+-------------------------+ +| 3 | 1970-01-01T00:00:00.007 | +| 3 | 1970-01-01T00:00:00.008 | +| 3 | 1970-01-01T00:00:00.009 | +| 4 | 1970-01-01T00:00:00.010 | ++---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=4 REDACTED +|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=4 REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: i@0 > 2 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Filter on a field. +SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4; + ++---+-------------------------+ +| i | t | ++---+-------------------------+ +| 4 | 1970-01-01T00:00:00.012 | +| 4 | 1970-01-01T00:00:00.011 | +| 4 | 1970-01-01T00:00:00.010 | +| 3 | 1970-01-01T00:00:00.009 | ++---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED +|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=4 REDACTED +|_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=4 REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: i@0 > 2 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + +-- Filter on the time index. +SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; + ++---+-------------------------+ +| i | t | ++---+-------------------------+ +| 4 | 1970-01-01T00:00:00.012 | +| 4 | 1970-01-01T00:00:00.011 | +| 4 | 1970-01-01T00:00:00.010 | +| 3 | 1970-01-01T00:00:00.009 | ++---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED +|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=4 REDACTED +|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=4 REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: t@1 > 8 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + DROP TABLE test; Affected Rows: 0 @@ -219,6 +320,39 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; |_|_| Total rows: 5_| +-+-+-+ +-- Filter on a pk column. +SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5; + ++----+---+-------------------------+ +| pk | i | t | ++----+---+-------------------------+ +| 8 | 3 | 1970-01-01T00:00:00.008 | +| 9 | 3 | 1970-01-01T00:00:00.009 | +| 10 | 4 | 1970-01-01T00:00:00.010 | +| 11 | 4 | 1970-01-01T00:00:00.011 | +| 12 | 4 | 1970-01-01T00:00:00.012 | ++----+---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED +|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 limit=5 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 5_| ++-+-+-+ + DROP TABLE test_pk; Affected Rows: 0 diff --git a/tests/cases/standalone/common/order/windowed_sort.sql b/tests/cases/standalone/common/order/windowed_sort.sql index e21ae3764bdb..13303e8f0e0e 100644 --- a/tests/cases/standalone/common/order/windowed_sort.sql +++ b/tests/cases/standalone/common/order/windowed_sort.sql @@ -33,6 +33,36 @@ SELECT * FROM test ORDER BY t DESC LIMIT 5; -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; +-- Filter on a field. +SELECT * FROM test where i > 2 ORDER BY t LIMIT 4; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4; + +-- Filter on a field. +SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4; + +-- Filter on the time index. +SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; + DROP TABLE test; -- Test with PK, with a windowed sort query. @@ -70,4 +100,14 @@ SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; +-- Filter on a pk column. +SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5; + DROP TABLE test_pk;