Skip to content

Commit

Permalink
fix(iceberg): iceberg partition by should specify field id and the pa…
Browse files Browse the repository at this point in the history
…rtition spec id (#20517)
  • Loading branch information
chenzl25 authored Feb 18, 2025
1 parent 3afbf25 commit 7167d86
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl IcebergSink {
}
};

let partition_fields = match &self.config.partition_by {
let partition_spec = match &self.config.partition_by {
Some(partition_field) => {
let mut partition_fields = Vec::<UnboundPartitionField>::new();
// captures column, transform(column), transform(n,column), transform(n, column)
Expand All @@ -330,7 +330,7 @@ impl IcebergSink {
bail!(format!("Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)", partition_field))
}
let caps = re.captures_iter(partition_field);
for mat in caps {
for (i, mat) in caps.enumerate() {
let (column, transform) =
if mat.name("n").is_none() && mat.name("field").is_none() {
(&mat["transform"], Transform::Identity)
Expand Down Expand Up @@ -360,6 +360,7 @@ impl IcebergSink {
.source_id(id)
.transform(transform)
.name(column.to_owned())
.field_id(i as i32)
.build(),
),
None => bail!(format!(
Expand All @@ -368,25 +369,32 @@ impl IcebergSink {
)),
};
}
partition_fields
Some(
UnboundPartitionSpec::builder()
.with_spec_id(0)
.add_partition_fields(partition_fields)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to add partition columns")?
.build(),
)
}
None => Vec::<UnboundPartitionField>::new(),
None => None,
};

let table_creation_builder = TableCreation::builder()
.name(self.config.common.table_name.clone())
.schema(iceberg_schema)
.partition_spec(
UnboundPartitionSpec::builder()
.add_partition_fields(partition_fields)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to add partition columns")?
.build(),
);

let table_creation = match location {
Some(location) => table_creation_builder.location(location).build(),
None => table_creation_builder.build(),
.schema(iceberg_schema);

let table_creation = match (location, partition_spec) {
(Some(location), Some(partition_spec)) => table_creation_builder
.location(location)
.partition_spec(partition_spec)
.build(),
(Some(location), None) => table_creation_builder.location(location).build(),
(None, Some(partition_spec)) => table_creation_builder
.partition_spec(partition_spec)
.build(),
(None, None) => table_creation_builder.build(),
};

catalog
Expand Down

0 comments on commit 7167d86

Please sign in to comment.