Skip to content

Commit

Permalink
refactor: simplify writer, schema evolution and generated columns
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
Signed-off-by: Liam Brannigan <liambrannigan@Liams-MacBook-Pro.local>
  • Loading branch information
ion-elgreco authored and Liam Brannigan committed Feb 19, 2025
1 parent e5c95b7 commit e0077ff
Show file tree
Hide file tree
Showing 10 changed files with 714 additions and 836 deletions.
3 changes: 2 additions & 1 deletion crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ use crate::delta_datafusion::{
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::logstore::LogStoreRef;
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig};
use crate::operations::write::execution::{write_execution_plan, write_execution_plan_cdc};
use crate::operations::write::WriterStatsConfig;
use crate::operations::CustomExecuteHandler;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
Expand Down
75 changes: 5 additions & 70 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Column, DFSchema, ExprSchema, ScalarValue, TableReference};
use datafusion_expr::{col, conditional_expressions::CaseBuilder, lit, when, Expr, JoinType};
use datafusion_expr::{
ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode,
UNNAMED_TABLE,
Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, UNNAMED_TABLE,
};

use delta_kernel::schema::{ColumnMetadataKey, StructType};
Expand All @@ -78,16 +77,18 @@ use crate::delta_datafusion::{
DeltaSessionConfig, DeltaTableProvider,
};

use crate::kernel::{Action, DataCheck, Metadata, StructTypeExt};
use crate::kernel::{Action, Metadata, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::operations::cast::merge_schema::{merge_arrow_field, merge_arrow_schema};
use crate::operations::cdc::*;
use crate::operations::merge::barrier::find_node;
use crate::operations::transaction::CommitBuilder;
use crate::operations::write::generated_columns::{
add_generated_columns, add_missing_generated_columns,
};
use crate::operations::write::WriterStatsConfig;
use crate::protocol::{DeltaOperation, MergePredicate};
use crate::table::state::DeltaTableState;
use crate::table::GeneratedColumn;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
use writer::write_execution_plan_v2;

Expand Down Expand Up @@ -776,72 +777,6 @@ async fn execute(
None => TableReference::bare(UNNAMED_TABLE),
};

/// Add generated column expressions to a dataframe
fn add_missing_generated_columns(
mut df: DataFrame,
generated_cols: &Vec<GeneratedColumn>,
) -> DeltaResult<(DataFrame, Vec<String>)> {
let mut missing_cols = vec![];
for generated_col in generated_cols {
let col_name = generated_col.get_name();

if df
.clone()
.schema()
.field_with_unqualified_name(col_name)
.is_err()
// implies it doesn't exist
{
debug!(
"Adding missing generated column {} in source as placeholder",
col_name
);
// If column doesn't exist, we add a null column, later we will generate the values after
// all the merge is projected.
// Other generated columns that were provided upon the start we only validate during write
missing_cols.push(col_name.to_string());
df = df
.clone()
.with_column(col_name, Expr::Literal(ScalarValue::Null))?;
}
}
Ok((df, missing_cols))
}

/// Add generated column expressions to a dataframe
fn add_generated_columns(
mut df: DataFrame,
generated_cols: &Vec<GeneratedColumn>,
generated_cols_missing_in_source: &[String],
state: &SessionState,
) -> DeltaResult<DataFrame> {
debug!("Generating columns in dataframe");
for generated_col in generated_cols {
// We only validate columns that were missing from the start. We don't update
// update generated columns that were provided during runtime
if !generated_cols_missing_in_source.contains(&generated_col.name) {
continue;
}

let generation_expr = state.create_logical_expr(
generated_col.get_generation_expression(),
df.clone().schema(),
)?;
let col_name = generated_col.get_name();

df = df.clone().with_column(
generated_col.get_name(),
when(col(col_name).is_null(), generation_expr)
.otherwise(col(col_name))?
.cast_to(
&arrow_schema::DataType::try_from(&generated_col.data_type)?,
df.schema(),
)?,
)?
}
Ok(df)
}

let generated_col_expressions = snapshot
.schema()
.get_generated_columns()
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use super::{
};
use super::{transaction::PROTOCOL, write::WriterStatsConfig};
use super::{
write::{write_execution_plan, write_execution_plan_cdc},
write::execution::{write_execution_plan, write_execution_plan_cdc},
CustomExecuteHandler, Operation,
};
use crate::delta_datafusion::{find_files, planner::DeltaPlanner, register_store};
Expand Down
18 changes: 18 additions & 0 deletions crates/core/src/operations/write/configs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/// Configuration for the writer on how to collect stats
#[derive(Clone)]
pub struct WriterStatsConfig {
/// Number of columns to collect stats for, idx based
pub num_indexed_cols: i32,
/// Optional list of columns which to collect stats for, takes precedende over num_index_cols
pub stats_columns: Option<Vec<String>>,
}

impl WriterStatsConfig {
/// Create new writer stats config
pub fn new(num_indexed_cols: i32, stats_columns: Option<Vec<String>>) -> Self {
Self {
num_indexed_cols,
stats_columns,
}
}
}
Loading

0 comments on commit e0077ff

Please sign in to comment.