Skip to content

Commit

Permalink
feat: predicate pushdown in parquet scan
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
  • Loading branch information
ion-elgreco authored and Liam Brannigan committed Feb 18, 2025
1 parent 80119fd commit 7b70aaf
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 28 deletions.
18 changes: 12 additions & 6 deletions crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,20 @@ impl TableProvider for DeltaCdfTableProvider {
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let session_state = session_state_from_session(session)?;
let mut plan = self.cdf_builder.build(session_state).await?;
let schema: DFSchema = self.schema().try_into()?;

let df_schema: DFSchema = plan.schema().try_into()?;
let mut plan = if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
let physical_expr = session.create_physical_expr(filter_expr, &schema)?;
let plan = self
.cdf_builder
.build(session_state, Some(&physical_expr))
.await?;
Arc::new(FilterExec::try_new(physical_expr, plan)?)
} else {
self.cdf_builder.build(session_state, None).await?
};

if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
let physical_expr = session.create_physical_expr(filter_expr, &df_schema)?;
plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
}
let df_schema: DFSchema = plan.schema().try_into()?;

if let Some(projection) = projection {
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down Expand Up @@ -1059,7 +1059,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down
29 changes: 15 additions & 14 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ impl CdfLoadBuilder {
pub(crate) async fn build(
&self,
session_sate: &SessionState,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> DeltaResult<Arc<dyn ExecutionPlan>> {
let (cdc, add, remove) = self.determine_files_to_read().await?;
register_store(self.log_store.clone(), session_sate.runtime_env().clone());
Expand Down Expand Up @@ -388,7 +389,7 @@ impl CdfLoadBuilder {
table_partition_cols: cdc_partition_cols,
output_ordering: vec![],
},
None,
filters,
)
.await?;

Expand All @@ -406,7 +407,7 @@ impl CdfLoadBuilder {
table_partition_cols: add_remove_partition_cols.clone(),
output_ordering: vec![],
},
None,
filters,
)
.await?;

Expand All @@ -424,7 +425,7 @@ impl CdfLoadBuilder {
table_partition_cols: add_remove_partition_cols,
output_ordering: vec![],
},
None,
filters,
)
.await?;

Expand Down Expand Up @@ -502,7 +503,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await?;

let batches = collect_batches(
Expand Down Expand Up @@ -553,7 +554,7 @@ pub(crate) mod tests {
.load_cdf()
.with_starting_version(0)
.with_ending_timestamp(starting_timestamp.and_utc())
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.unwrap();

Expand Down Expand Up @@ -599,7 +600,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await?;

let batches = collect_batches(
Expand Down Expand Up @@ -652,7 +653,7 @@ pub(crate) mod tests {
.load_cdf()
.with_starting_version(4)
.with_ending_version(1)
.build(&ctx.state())
.build(&ctx.state(), None)
.await;

assert!(table.is_err());
Expand All @@ -671,7 +672,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_version(5)
.build(&ctx.state())
.build(&ctx.state(), None)
.await;

assert!(table.is_err());
Expand All @@ -691,7 +692,7 @@ pub(crate) mod tests {
.load_cdf()
.with_starting_version(5)
.with_allow_out_of_range()
.build(&ctx.state())
.build(&ctx.state(), None)
.await?;

let batches = collect_batches(
Expand All @@ -714,7 +715,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.build(&ctx.state())
.build(&ctx.state(), None)
.await;

assert!(table.is_err());
Expand All @@ -735,7 +736,7 @@ pub(crate) mod tests {
.load_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.with_allow_out_of_range()
.build(&ctx.state())
.build(&ctx.state(), None)
.await?;

let batches = collect_batches(
Expand All @@ -757,7 +758,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await;

assert!(table.is_err());
Expand All @@ -777,7 +778,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.build(&ctx.state())
.build(&ctx.state(), None)
.await?;

let batches = collect_batches(
Expand Down Expand Up @@ -868,7 +869,7 @@ pub(crate) mod tests {
let cdf_scan = DeltaOps(table.clone())
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4081,7 +4081,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down Expand Up @@ -4198,7 +4198,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down Expand Up @@ -4286,7 +4286,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down Expand Up @@ -1365,7 +1365,7 @@ mod tests {
let table = DeltaOps(table)
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,7 @@ mod tests {
let cdf_scan = DeltaOps(table.clone())
.load_cdf()
.with_starting_version(0)
.build(&ctx.state())
.build(&ctx.state(), None)
.await
.expect("Failed to load CDF");

Expand Down
1 change: 1 addition & 0 deletions python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def test_read_cdf_partitioned_with_predicate():
assert len(values) == 1
assert values[0] == date(2023, 12, 25)


def test_read_cdf_partitioned():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
b = dt.load_cdf(0, 3).read_all().to_pydict()
Expand Down

0 comments on commit 7b70aaf

Please sign in to comment.