Skip to content

Commit

Permalink
impl sub_fields
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Feb 17, 2025
1 parent e8ad2da commit 57f3298
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 23 deletions.
11 changes: 11 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,17 @@ impl Field {
name: format!("{}.{}", table_name, desc.name),
}
}

/// Get the sub fields if the data type is a struct, otherwise return an empty vector.
pub fn sub_fields(&self) -> Vec<Field> {
if let DataType::Struct(st) = &self.data_type {
st.iter()
.map(|(name, data_type)| Field::with_name(data_type.clone(), name))
.collect()
} else {
Vec::new()
}
}
}

impl From<&PbField> for Field {
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ impl BigQuerySink {
)))
}
DataType::Struct(_) => {
todo!()
// let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len());
// for rw_field in &rw_field.sub_fields {
// let field = Self::map_field(rw_field)?;
// sub_fields.push(field)
// }
// TableFieldSchema::record(&rw_field.name, sub_fields)
let rw_sub_fields = rw_field.sub_fields();
let mut sub_fields = Vec::with_capacity(rw_sub_fields.len());
for rw_field in &rw_sub_fields {
let field = Self::map_field(rw_field)?;
sub_fields.push(field);
}
TableFieldSchema::record(&rw_field.name, sub_fields)
}
DataType::List(dt) => {
let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
Expand Down
25 changes: 12 additions & 13 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,19 +1073,18 @@ pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String
let mut vec = vec![];
for field in schema.fields() {
if matches!(field.data_type, DataType::Struct(_)) {
// for i in &field.sub_fields {
// if matches!(i.data_type, DataType::Struct(_)) {
// return Err(SinkError::ClickHouse(
// "Only one level of nesting is supported for struct".to_owned(),
// ));
// } else {
// vec.push((
// format!("{}.{}", field.name, i.name),
// DataType::List(Box::new(i.data_type())),
// ))
// }
// }
todo!()
for i in &field.sub_fields() {
if matches!(i.data_type, DataType::Struct(_)) {
return Err(SinkError::ClickHouse(
"Only one level of nesting is supported for struct".to_owned(),
));
} else {
vec.push((
format!("{}.{}", field.name, i.name),
DataType::List(Box::new(i.data_type())),
))
}
}
} else {
vec.push((field.name.clone(), field.data_type()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
// Get field info from o2i.
let name = match o2i.try_map(idx) {
Some(input_idx) => input_schema.fields()[input_idx].name.clone(),
None => (format!("{:?}", ExprDisplay { expr, input_schema }),),
None => format!("{:?}", ExprDisplay { expr, input_schema }),
};
Field::with_name(expr.return_type(), name)
}));
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,6 @@ impl PlanTreeNodeUnary for StreamMaterialize {
.zip_eq_fast(self.base.schema().fields.iter())
.for_each(|(a, b)| {
assert_eq!(a.data_type, b.data_type);
assert_eq!(a.type_nameeee, b.type_nameeee);
assert_eq!(a.sub_fields, b.sub_fields);
});
assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
new
Expand Down

0 comments on commit 57f3298

Please sign in to comment.