diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index de5367e682..1299267616 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -83,6 +83,7 @@ use crate::operations::cast::merge_schema::{merge_arrow_field, merge_arrow_schem use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; use crate::operations::transaction::CommitBuilder; +use crate::operations::write::execution::write_execution_plan_v2; use crate::operations::write::generated_columns::{ add_generated_columns, add_missing_generated_columns, }; @@ -90,11 +91,9 @@ use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; -use writer::write_execution_plan_v2; mod barrier; mod filter; -mod writer; const SOURCE_COLUMN: &str = "__delta_rs_source"; const TARGET_COLUMN: &str = "__delta_rs_target"; diff --git a/crates/core/src/operations/merge/writer.rs b/crates/core/src/operations/merge/writer.rs deleted file mode 100644 index 2c563abd10..0000000000 --- a/crates/core/src/operations/merge/writer.rs +++ /dev/null @@ -1,248 +0,0 @@ -//! Writer for MERGE operation, can write normal and CDF data at same time - -use std::sync::Arc; -use std::vec; - -use arrow::compute::concat_batches; -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef as ArrowSchemaRef}; -use datafusion::catalog::TableProvider; -use datafusion::datasource::MemTable; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion_expr::{col, lit}; -use datafusion_physical_plan::ExecutionPlan; -use futures::StreamExt; -use object_store::prefix::PrefixStore; -use parquet::file::properties::WriterProperties; -use tracing::log::*; - -use crate::operations::cdc::CDC_COLUMN_NAME; -use crate::operations::writer::{DeltaWriter, WriterConfig}; - -use crate::delta_datafusion::DeltaDataChecker; -use crate::errors::DeltaResult; -use crate::kernel::{Action, AddCDCFile, StructType, StructTypeExt}; - -use crate::operations::write::{WriteError, WriterStatsConfig}; -use crate::storage::ObjectStoreRef; -use crate::table::state::DeltaTableState; - -use tokio::sync::mpsc::Sender; - -#[allow(clippy::too_many_arguments)] -pub(crate) async fn write_execution_plan_v2( - snapshot: Option<&DeltaTableState>, - state: SessionState, - plan: Arc, - partition_columns: Vec, - object_store: ObjectStoreRef, - target_file_size: Option, - write_batch_size: Option, - writer_properties: Option, - writer_stats_config: WriterStatsConfig, - sender: Option>, - contains_cdc: bool, -) -> DeltaResult> { - // We always take the plan Schema since the data may contain Large/View arrow types, - // the schema and batches were prior constructed with this in mind. - let schema: ArrowSchemaRef = plan.schema(); - let checker = if let Some(snapshot) = snapshot { - DeltaDataChecker::new(snapshot) - } else { - debug!("Using plan schema to derive generated columns, since no snapshot was provided. Implies first write."); - let delta_schema: StructType = schema.as_ref().try_into()?; - DeltaDataChecker::new_with_generated_columns( - delta_schema.get_generated_columns().unwrap_or_default(), - ) - }; - - // Write data to disk - let mut tasks = vec![]; - if !contains_cdc { - for i in 0..plan.properties().output_partitioning().partition_count() { - let inner_plan = plan.clone(); - let inner_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&state)); - let config = WriterConfig::new( - inner_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - let mut writer = DeltaWriter::new(object_store.clone(), config); - let checker_stream = checker.clone(); - let sender_stream = sender.clone(); - let mut stream = inner_plan.execute(i, task_ctx)?; - - let handle: tokio::task::JoinHandle>> = tokio::task::spawn( - async move { - let sendable = sender_stream.clone(); - while let Some(maybe_batch) = stream.next().await { - let batch = maybe_batch?; - - checker_stream.check_batch(&batch).await?; - - if let Some(s) = sendable.as_ref() { - if let Err(e) = s.send(batch.clone()).await { - error!("Failed to send data to observer: {e:#?}"); - } - } else { - debug!("write_execution_plan_with_predicate did not send any batches, no sender."); - } - writer.write(&batch).await?; - } - let add_actions = writer.close().await; - match add_actions { - Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::>()), - Err(err) => Err(err), - } - }, - ); - tasks.push(handle); - } - } else { - // Incoming plan contains the normal write_plan unioned with the cdf plan - // we split these batches during the write - let cdf_store = Arc::new(PrefixStore::new(object_store.clone(), "_change_data")); - for i in 0..plan.properties().output_partitioning().partition_count() { - let inner_plan = plan.clone(); - let write_schema = Arc::new(Schema::new( - schema - .clone() - .fields() - .into_iter() - .filter_map(|f| { - if f.name() != CDC_COLUMN_NAME { - Some(f.as_ref().clone()) - } else { - None - } - }) - .collect::>(), - )); - let cdf_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&state)); - let normal_config = WriterConfig::new( - write_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - - let cdf_config = WriterConfig::new( - cdf_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - - let mut writer = DeltaWriter::new(object_store.clone(), normal_config); - - let mut cdf_writer = DeltaWriter::new(cdf_store.clone(), cdf_config); - - let checker_stream = checker.clone(); - let sender_stream = sender.clone(); - let mut stream = inner_plan.execute(i, task_ctx)?; - - let session_context = SessionContext::new(); - - let handle: tokio::task::JoinHandle>> = tokio::task::spawn( - async move { - let sendable = sender_stream.clone(); - while let Some(maybe_batch) = stream.next().await { - let batch = maybe_batch?; - - // split batch since we unioned upstream the operation write and cdf plan - let table_provider: Arc = Arc::new(MemTable::try_new( - batch.schema(), - vec![vec![batch.clone()]], - )?); - let batch_df = session_context.read_table(table_provider).unwrap(); - - let normal_df = batch_df - .clone() - .filter(col(CDC_COLUMN_NAME).in_list( - vec![lit("delete"), lit("source_delete"), lit("update_preimage")], - true, - ))? - .drop_columns(&[CDC_COLUMN_NAME])?; - - let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( - vec![ - lit("delete"), - lit("insert"), - lit("update_preimage"), - lit("update_postimage"), - ], - false, - ))?; - - let normal_batch = - concat_batches(&write_schema, &normal_df.collect().await?)?; - checker_stream.check_batch(&normal_batch).await?; - - let cdf_batch = concat_batches(&cdf_schema, &cdf_df.collect().await?)?; - checker_stream.check_batch(&cdf_batch).await?; - - if let Some(s) = sendable.as_ref() { - if let Err(e) = s.send(batch.clone()).await { - error!("Failed to send data to observer: {e:#?}"); - } - } else { - debug!("write_execution_plan_with_predicate did not send any batches, no sender."); - } - writer.write(&normal_batch).await?; - cdf_writer.write(&cdf_batch).await?; - } - let mut add_actions = writer - .close() - .await? - .into_iter() - .map(Action::Add) - .collect::>(); - let cdf_actions = cdf_writer.close().await.map(|v| { - v.into_iter() - .map(|add| { - { - Action::Cdc(AddCDCFile { - // This is a gnarly hack, but the action needs the nested path, not the - // path isnide the prefixed store - path: format!("_change_data/{}", add.path), - size: add.size, - partition_values: add.partition_values, - data_change: false, - tags: add.tags, - }) - } - }) - .collect::>() - })?; - add_actions.extend(cdf_actions); - Ok(add_actions) - }, - ); - tasks.push(handle); - } - } - let actions = futures::future::join_all(tasks) - .await - .into_iter() - .collect::, _>>() - .map_err(|err| WriteError::WriteTask { source: err })? - .into_iter() - .collect::, _>>()? - .concat() - .into_iter() - .collect::>(); - // Collect add actions to add to commit - Ok(actions) -} diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 3a5796d964..74fd18719b 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -5,7 +5,7 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use datafusion::datasource::provider_as_source; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::prelude::DataFrame; -use datafusion_expr::{lit, Expr, LogicalPlanBuilder}; +use datafusion_expr::{lit, when, Expr, LogicalPlanBuilder}; use datafusion_physical_plan::ExecutionPlan; use futures::StreamExt; use object_store::prefix::PrefixStore; @@ -24,91 +24,17 @@ use crate::operations::writer::{DeltaWriter, WriterConfig}; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::table::Constraint as DeltaConstraint; +use crate::DeltaTableError; -use super::configs::WriterStatsConfig; -use super::WriteError; +use arrow::compute::concat_batches; +use arrow_schema::Schema; +use datafusion::catalog::TableProvider; +use datafusion::datasource::MemTable; +use datafusion::execution::context::SessionContext; +use datafusion_expr::col; -#[allow(clippy::too_many_arguments)] -pub(crate) async fn write_execution_plan_with_predicate( - predicate: Option, - snapshot: Option<&DeltaTableState>, - state: SessionState, - plan: Arc, - partition_columns: Vec, - object_store: ObjectStoreRef, - target_file_size: Option, - write_batch_size: Option, - writer_properties: Option, - writer_stats_config: WriterStatsConfig, -) -> DeltaResult> { - // We always take the plan Schema since the data may contain Large/View arrow types, - // the schema and batches were prior constructed with this in mind. - let schema: ArrowSchemaRef = plan.schema(); - let checker = if let Some(snapshot) = snapshot { - DeltaDataChecker::new(snapshot) - } else { - debug!("Using plan schema to derive generated columns, since no snapshot was provided. Implies first write."); - let delta_schema: StructType = schema.as_ref().try_into()?; - DeltaDataChecker::new_with_generated_columns( - delta_schema.get_generated_columns().unwrap_or_default(), - ) - }; - let checker = match predicate { - Some(pred) => { - // TODO: get the name of the outer-most column? `*` will also work but would it be slower? - let chk = DeltaConstraint::new("*", &fmt_expr_to_sql(&pred)?); - checker.with_extra_constraints(vec![chk]) - } - _ => checker, - }; - // Write data to disk - let mut tasks = vec![]; - for i in 0..plan.properties().output_partitioning().partition_count() { - let inner_plan = plan.clone(); - let inner_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&state)); - let config = WriterConfig::new( - inner_schema.clone(), - partition_columns.clone(), - writer_properties.clone(), - target_file_size, - write_batch_size, - writer_stats_config.num_indexed_cols, - writer_stats_config.stats_columns.clone(), - ); - let mut writer = DeltaWriter::new(object_store.clone(), config); - let checker_stream = checker.clone(); - let mut stream = inner_plan.execute(i, task_ctx)?; - - let handle: tokio::task::JoinHandle>> = - tokio::task::spawn(async move { - while let Some(maybe_batch) = stream.next().await { - let batch = maybe_batch?; - checker_stream.check_batch(&batch).await?; - writer.write(&batch).await?; - } - let add_actions = writer.close().await; - match add_actions { - Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::>()), - Err(err) => Err(err), - } - }); - - tasks.push(handle); - } - let actions = futures::future::join_all(tasks) - .await - .into_iter() - .collect::, _>>() - .map_err(|err| WriteError::WriteTask { source: err })? - .into_iter() - .collect::, _>>()? - .concat() - .into_iter() - .collect::>(); - // Collect add actions to add to commit - Ok(actions) -} +use crate::operations::cdc::CDC_COLUMN_NAME; +use crate::operations::write::{WriteError, WriterStatsConfig}; #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_cdc( @@ -169,8 +95,7 @@ pub(crate) async fn write_execution_plan( writer_properties: Option, writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { - write_execution_plan_with_predicate( - None, + write_execution_plan_v2( snapshot, state, plan, @@ -180,6 +105,8 @@ pub(crate) async fn write_execution_plan( write_batch_size, writer_properties, writer_stats_config, + None, + false, ) .await } @@ -195,9 +122,8 @@ pub(crate) async fn execute_non_empty_expr( writer_properties: Option, writer_stats_config: WriterStatsConfig, partition_scan: bool, - insert_df: DataFrame, operation_id: Uuid, -) -> DeltaResult> { +) -> DeltaResult<(Vec, Option)> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. let mut actions: Vec = Vec::new(); @@ -219,7 +145,7 @@ pub(crate) async fn execute_non_empty_expr( let df = DataFrame::new(state.clone(), source); - if !partition_scan { + let cdf_df = if !partition_scan { // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); @@ -243,79 +169,23 @@ pub(crate) async fn execute_non_empty_expr( .await?; actions.extend(add_actions); - } - - // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column - // Only write when CDC actions when it was not a partition scan, load_cdf can deduce the deletes in that case - // based on the remove actions if a partition got deleted - if !partition_scan { - // We only write deletions when it was not a partition scan - if let Some(cdc_actions) = execute_non_empty_expr_cdc( - snapshot, - log_store, - state.clone(), - df, - expression, - partition_columns, - writer_properties, - writer_stats_config, - insert_df, - operation_id, - ) - .await? - { - actions.extend(cdc_actions) - } - } - Ok(actions) -} - -/// If CDC is enabled it writes all the deletions based on predicate into _change_data directory -#[allow(clippy::too_many_arguments)] -pub(crate) async fn execute_non_empty_expr_cdc( - snapshot: &DeltaTableState, - log_store: LogStoreRef, - state: SessionState, - scan: DataFrame, - expression: &Expr, - table_partition_cols: Vec, - writer_properties: Option, - writer_stats_config: WriterStatsConfig, - insert_df: DataFrame, - operation_id: Uuid, -) -> DeltaResult>> { - match should_write_cdc(snapshot) { - // Create CDC scan - Ok(true) => { - let filter = scan.clone().filter(expression.clone())?; - - // Add literal column "_change_type" - let delete_change_type_expr = lit("delete").alias("_change_type"); - - let insert_change_type_expr = lit("insert").alias("_change_type"); - - let delete_df = filter.with_column("_change_type", delete_change_type_expr)?; - - let insert_df = insert_df.with_column("_change_type", insert_change_type_expr)?; - - let cdc_df = delete_df.union(insert_df)?; - let cdc_actions = write_execution_plan_cdc( - Some(snapshot), - state.clone(), - cdc_df.create_physical_plan().await?, - table_partition_cols.clone(), - log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size() as usize), - None, - writer_properties, - writer_stats_config, + // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column + // Only write when CDC actions when it was not a partition scan, load_cdf can deduce the deletes in that case + // based on the remove actions if a partition got deleted + if should_write_cdc(snapshot)? { + Some( + df.filter(expression.clone())? + .with_column(CDC_COLUMN_NAME, lit("delete"))?, ) - .await?; - Ok(Some(cdc_actions)) + } else { + None } - _ => Ok(None), - } + } else { + None + }; + + Ok((actions, cdf_df)) } // This should only be called with a valid predicate @@ -329,13 +199,12 @@ pub(crate) async fn prepare_predicate_actions( writer_properties: Option, deletion_timestamp: i64, writer_stats_config: WriterStatsConfig, - insert_df: DataFrame, operation_id: Uuid, -) -> DeltaResult> { +) -> DeltaResult<(Vec, Option)> { let candidates = find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; - let mut actions = execute_non_empty_expr( + let (mut actions, cdf_df) = execute_non_empty_expr( snapshot, log_store, state, @@ -345,7 +214,6 @@ pub(crate) async fn prepare_predicate_actions( writer_properties, writer_stats_config, candidates.partition_scan, - insert_df, operation_id, ) .await?; @@ -366,5 +234,220 @@ pub(crate) async fn prepare_predicate_actions( default_row_commit_version: action.default_row_commit_version, })) } + Ok((actions, cdf_df)) +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn write_execution_plan_v2( + snapshot: Option<&DeltaTableState>, + state: SessionState, + plan: Arc, + partition_columns: Vec, + object_store: ObjectStoreRef, + target_file_size: Option, + write_batch_size: Option, + writer_properties: Option, + writer_stats_config: WriterStatsConfig, + predicate: Option, + contains_cdc: bool, +) -> DeltaResult> { + // We always take the plan Schema since the data may contain Large/View arrow types, + // the schema and batches were prior constructed with this in mind. + let schema: ArrowSchemaRef = plan.schema(); + let mut checker = if let Some(snapshot) = snapshot { + DeltaDataChecker::new(snapshot) + } else { + debug!("Using plan schema to derive generated columns, since no snapshot was provided. Implies first write."); + let delta_schema: StructType = schema.as_ref().try_into()?; + DeltaDataChecker::new_with_generated_columns( + delta_schema.get_generated_columns().unwrap_or_default(), + ) + }; + + if let Some(mut pred) = predicate { + if contains_cdc { + pred = when(col(CDC_COLUMN_NAME).eq(lit("insert")), pred).otherwise(lit(true))?; + } + let chk = DeltaConstraint::new("*", &fmt_expr_to_sql(&pred)?); + checker = checker.with_extra_constraints(vec![chk]) + } + + // Write data to disk + let mut tasks = vec![]; + if !contains_cdc { + for i in 0..plan.properties().output_partitioning().partition_count() { + let inner_plan = plan.clone(); + let inner_schema = schema.clone(); + let task_ctx = Arc::new(TaskContext::from(&state)); + let config = WriterConfig::new( + inner_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + let mut writer = DeltaWriter::new(object_store.clone(), config); + let checker_stream = checker.clone(); + let mut stream = inner_plan.execute(i, task_ctx)?; + + let handle: tokio::task::JoinHandle>> = + tokio::task::spawn(async move { + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + checker_stream.check_batch(&batch).await?; + writer.write(&batch).await?; + } + let add_actions = writer.close().await; + match add_actions { + Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::>()), + Err(err) => Err(err), + } + }); + tasks.push(handle); + } + } else { + // Incoming plan contains the normal write_plan unioned with the cdf plan + // we split these batches during the write + let cdf_store = Arc::new(PrefixStore::new(object_store.clone(), "_change_data")); + for i in 0..plan.properties().output_partitioning().partition_count() { + let inner_plan = plan.clone(); + let write_schema = Arc::new(Schema::new( + schema + .clone() + .fields() + .into_iter() + .filter_map(|f| { + if f.name() != CDC_COLUMN_NAME { + Some(f.as_ref().clone()) + } else { + None + } + }) + .collect::>(), + )); + let cdf_schema = schema.clone(); + let task_ctx = Arc::new(TaskContext::from(&state)); + let normal_config = WriterConfig::new( + write_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + + let cdf_config = WriterConfig::new( + cdf_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + + let mut writer = DeltaWriter::new(object_store.clone(), normal_config); + + let mut cdf_writer = DeltaWriter::new(cdf_store.clone(), cdf_config); + + let checker_stream = checker.clone(); + let mut stream = inner_plan.execute(i, task_ctx)?; + + let session_context = SessionContext::new(); + + let handle: tokio::task::JoinHandle>> = + tokio::task::spawn(async move { + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + + // split batch since we unioned upstream the operation write and cdf plan + let table_provider: Arc = Arc::new(MemTable::try_new( + batch.schema(), + vec![vec![batch.clone()]], + )?); + let batch_df = session_context.read_table(table_provider).unwrap(); + + let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list( + vec![lit("delete"), lit("source_delete"), lit("update_preimage")], + true, + ))?; + + let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( + vec![ + lit("delete"), + lit("insert"), + lit("update_preimage"), + lit("update_postimage"), + ], + false, + ))?; + + // Concatenate with the CDF_schema, since we need to keep the _change_type col + let mut normal_batch = + concat_batches(&cdf_schema, &normal_df.collect().await?)?; + checker_stream.check_batch(&normal_batch).await?; + + // Drop the CDC_COLUMN ("_change_type") + let mut idx: Option = None; + for (i, field) in normal_batch.schema_ref().fields().iter().enumerate() { + if field.name() == CDC_COLUMN_NAME { + idx = Some(i); + break; + } + } + + normal_batch.remove_column(idx.ok_or(DeltaTableError::generic( + "idx of _change_type col not found. This shouldn't have happened.", + ))?); + + let cdf_batch = concat_batches(&cdf_schema, &cdf_df.collect().await?)?; + checker_stream.check_batch(&cdf_batch).await?; + + writer.write(&normal_batch).await?; + cdf_writer.write(&cdf_batch).await?; + } + let mut add_actions = writer + .close() + .await? + .into_iter() + .map(Action::Add) + .collect::>(); + let cdf_actions = cdf_writer.close().await.map(|v| { + v.into_iter() + .map(|add| { + { + Action::Cdc(AddCDCFile { + // This is a gnarly hack, but the action needs the nested path, not the + // path isnide the prefixed store + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + }) + } + }) + .collect::>() + })?; + add_actions.extend(cdf_actions); + Ok(add_actions) + }); + tasks.push(handle); + } + } + let actions = futures::future::join_all(tasks) + .await + .into_iter() + .collect::, _>>() + .map_err(|err| WriteError::WriteTask { source: err })? + .into_iter() + .collect::, _>>()? + .concat() + .into_iter() + .collect::>(); + // Collect add actions to add to commit Ok(actions) } diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 02ef8b94ef..81742191c6 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -47,13 +47,14 @@ use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::prelude::DataFrame; use datafusion_common::{Column, DFSchema, Result, ScalarValue}; use datafusion_expr::{cast, lit, try_cast, Expr, Extension, LogicalPlan}; -use execution::{prepare_predicate_actions, write_execution_plan_with_predicate}; +use execution::{prepare_predicate_actions, write_execution_plan_v2}; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use schema_evolution::try_cast_schema; use serde::{Deserialize, Serialize}; use tracing::log::*; +use super::cdc::CDC_COLUMN_NAME; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL}; use super::{CreateBuilder, CustomExecuteHandler, Operation}; @@ -542,7 +543,7 @@ impl std::future::IntoFuture for WriteBuilder { }), }); - let source = DataFrame::new(state.clone(), source); + let mut source = DataFrame::new(state.clone(), source); let schema = Arc::new(source.schema().as_arrow().clone()); @@ -600,33 +601,7 @@ impl std::future::IntoFuture for WriteBuilder { stats_columns, }; - let source_plan = source.clone().create_physical_plan().await?; - - // Here we need to validate if the new data conforms to a predicate if one is provided - let add_actions = write_execution_plan_with_predicate( - predicate.clone(), - this.snapshot.as_ref(), - state.clone(), - source_plan.clone(), - partition_columns.clone(), - this.log_store.object_store(Some(operation_id)).clone(), - target_file_size, - this.write_batch_size, - this.writer_properties.clone(), - writer_stats_config.clone(), - ) - .await?; - - let source_count = - find_metric_node(SOURCE_COUNT_ID, &source_plan).ok_or_else(|| { - DeltaTableError::Generic("Unable to locate expected metric node".into()) - })?; - let source_count_metrics = source_count.metrics().unwrap(); - let num_added_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC); - metrics.num_added_rows = num_added_rows; - - metrics.num_added_files = add_actions.len(); - actions.extend(add_actions); + let mut contains_cdc = false; // Collect remove actions if we are overwriting the table if let Some(snapshot) = &this.snapshot { @@ -658,21 +633,28 @@ impl std::future::IntoFuture for WriteBuilder { .unwrap() .as_millis() as i64; - match predicate { + match &predicate { Some(pred) => { - let predicate_actions = prepare_predicate_actions( - pred, + let (predicate_actions, cdf_df) = prepare_predicate_actions( + pred.clone(), this.log_store.clone(), snapshot, - state, + state.clone(), partition_columns.clone(), - this.writer_properties, + this.writer_properties.clone(), deletion_timestamp, - writer_stats_config, - source, + writer_stats_config.clone(), operation_id, ) .await?; + + if let Some(cdf_df) = cdf_df { + contains_cdc = true; + source = source + .with_column(CDC_COLUMN_NAME, lit("insert"))? + .union(cdf_df)?; + } + if !predicate_actions.is_empty() { actions.extend(predicate_actions); } @@ -692,6 +674,35 @@ impl std::future::IntoFuture for WriteBuilder { .count(); } + let source_plan = source.clone().create_physical_plan().await?; + + // Here we need to validate if the new data conforms to a predicate if one is provided + let add_actions = write_execution_plan_v2( + this.snapshot.as_ref(), + state.clone(), + source_plan.clone(), + partition_columns.clone(), + this.log_store.object_store(Some(operation_id)).clone(), + target_file_size, + this.write_batch_size, + this.writer_properties, + writer_stats_config.clone(), + predicate.clone(), + contains_cdc, + ) + .await?; + + let source_count = + find_metric_node(SOURCE_COUNT_ID, &source_plan).ok_or_else(|| { + DeltaTableError::Generic("Unable to locate expected metric node".into()) + })?; + let source_count_metrics = source_count.metrics().unwrap(); + let num_added_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC); + metrics.num_added_rows = num_added_rows; + + metrics.num_added_files = add_actions.len(); + actions.extend(add_actions); + metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; diff --git a/python/src/lib.rs b/python/src/lib.rs index d1ff324a23..7ec61aa118 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -66,6 +66,10 @@ use error::DeltaError; use futures::future::join_all; use tracing::log::*; +use crate::writer::to_lazy_table; +use deltalake::datafusion::datasource::provider_as_source; +use deltalake::datafusion::logical_expr::LogicalPlanBuilder; + use crate::error::DeltaProtocolError; use crate::error::PythonError; use crate::features::TableFeatures; @@ -2165,32 +2169,30 @@ fn write_to_deltalake( .map_err(PythonError::from)? }; - let dont_be_so_lazy = match table.0.state.as_ref() { - Some(state) => state.table_config().enable_change_data_feed(), - // You don't have state somehow, so I guess it's okay to be lazy. - _ => false, - }; + // let dont_be_so_lazy = match table.0.state.as_ref() { + // Some(state) => state.table_config().enable_change_data_feed() && predicate.is_some(), + // // You don't have state somehow, so I guess it's okay to be lazy. + // _ => false, + // }; let mut builder = WriteBuilder::new(table.0.log_store(), table.0.state).with_save_mode(save_mode); - if dont_be_so_lazy { - debug!( - "write_to_deltalake() is not able to lazily perform a write, collecting batches" - ); - builder = builder.with_input_batches(data.0.map(|batch| batch.unwrap())); - } else { - use crate::writer::to_lazy_table; - use deltalake::datafusion::datasource::provider_as_source; - use deltalake::datafusion::logical_expr::LogicalPlanBuilder; - let table_provider = to_lazy_table(data.0).map_err(PythonError::from)?; + // if dont_be_so_lazy { + // debug!( + // "write_to_deltalake() is not able to lazily perform a write, collecting batches" + // ); + // builder = builder.with_input_batches(data.0.map(|batch| batch.unwrap())); + // } else { - let plan = LogicalPlanBuilder::scan("source", provider_as_source(table_provider), None) - .map_err(PythonError::from)? - .build() - .map_err(PythonError::from)?; - builder = builder.with_input_execution_plan(Arc::new(plan)); - } + let table_provider = to_lazy_table(data.0).map_err(PythonError::from)?; + + let plan = LogicalPlanBuilder::scan("source", provider_as_source(table_provider), None) + .map_err(PythonError::from)? + .build() + .map_err(PythonError::from)?; + builder = builder.with_input_execution_plan(Arc::new(plan)); + // } if let Some(schema_mode) = schema_mode { builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?);