From 57f3298a70a844b459b4e328980f3f0ba2393083 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 14 Feb 2025 14:06:55 +0800 Subject: [PATCH] impl sub_fields Signed-off-by: Bugen Zhao --- src/common/src/catalog/schema.rs | 11 ++++++++ src/connector/src/sink/big_query.rs | 14 +++++------ src/connector/src/sink/clickhouse.rs | 25 +++++++++---------- .../plan_node/generic/project_set.rs | 2 +- .../optimizer/plan_node/stream_materialize.rs | 2 -- 5 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index fa7f9f5902aff..baf825b05cdb6 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -246,6 +246,17 @@ impl Field { name: format!("{}.{}", table_name, desc.name), } } + + /// Get the sub fields if the data type is a struct, otherwise return an empty vector. + pub fn sub_fields(&self) -> Vec { + if let DataType::Struct(st) = &self.data_type { + st.iter() + .map(|(name, data_type)| Field::with_name(data_type.clone(), name)) + .collect() + } else { + Vec::new() + } + } } impl From<&PbField> for Field { diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 9b61f3e02e404..aaec375d51866 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -376,13 +376,13 @@ impl BigQuerySink { ))) } DataType::Struct(_) => { - todo!() - // let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len()); - // for rw_field in &rw_field.sub_fields { - // let field = Self::map_field(rw_field)?; - // sub_fields.push(field) - // } - // TableFieldSchema::record(&rw_field.name, sub_fields) + let rw_sub_fields = rw_field.sub_fields(); + let mut sub_fields = Vec::with_capacity(rw_sub_fields.len()); + for rw_field in &rw_sub_fields { + let field = Self::map_field(rw_field)?; + sub_fields.push(field); + } + TableFieldSchema::record(&rw_field.name, sub_fields) } DataType::List(dt) => { let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?; diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index ea622c127cb99..808e12ccbced4 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -1073,19 +1073,18 @@ pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result GenericPlanNode for ProjectSet { // Get field info from o2i. let name = match o2i.try_map(idx) { Some(input_idx) => input_schema.fields()[input_idx].name.clone(), - None => (format!("{:?}", ExprDisplay { expr, input_schema }),), + None => format!("{:?}", ExprDisplay { expr, input_schema }), }; Field::with_name(expr.return_type(), name) })); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index f9115609f6443..5794d4a794c37 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -382,8 +382,6 @@ impl PlanTreeNodeUnary for StreamMaterialize { .zip_eq_fast(self.base.schema().fields.iter()) .for_each(|(a, b)| { assert_eq!(a.data_type, b.data_type); - assert_eq!(a.type_nameeee, b.type_nameeee); - assert_eq!(a.sub_fields, b.sub_fields); }); assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key()); new