diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 8a1a4a5072d6b..63be824b337b3 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -295,12 +295,8 @@ impl ColumnDesc { data_type: field.data_type.clone(), column_id: ColumnId::new(id), name: field.name.clone(), - field_descs: field - .sub_fields - .iter() - .map(Self::from_field_without_column_id) - .collect_vec(), - type_name: field.type_name.clone(), + field_descs: Vec::new(), // TODO: deprecate this + type_name: String::new(), // TODO: deprecate this description: None, generated_or_default_column: None, additional_column: AdditionalColumn { column_type: None }, diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index 3e7fc3d2fb4e1..fa7f9f5902aff 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -14,7 +14,6 @@ use std::ops::Index; -use itertools::Itertools; use risingwave_pb::plan_common::{PbColumnDesc, PbField}; use super::ColumnDesc; @@ -27,11 +26,6 @@ use crate::util::iter_util::ZipEqFast; pub struct Field { pub data_type: DataType, pub name: String, - /// For STRUCT type. - pub sub_fields: Vec, - /// The user-defined type's name, when the type is created from a protobuf schema file, - /// this field will store the message name. - pub type_name: String, } impl Field { @@ -39,8 +33,6 @@ impl Field { Self { data_type, name: name.into(), - sub_fields: vec![], - type_name: String::new(), } } } @@ -65,8 +57,6 @@ impl From<&ColumnDesc> for Field { Self { data_type: desc.data_type.clone(), name: desc.name.clone(), - sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(), - type_name: desc.type_name.clone(), } } } @@ -76,12 +66,6 @@ impl From for Field { Self { data_type: column_desc.data_type, name: column_desc.name, - sub_fields: column_desc - .field_descs - .into_iter() - .map(Into::into) - .collect(), - type_name: column_desc.type_name, } } } @@ -91,8 +75,6 @@ impl From<&PbColumnDesc> for Field { Self { data_type: pb_column_desc.column_type.as_ref().unwrap().into(), name: pb_column_desc.name.clone(), - sub_fields: pb_column_desc.field_descs.iter().map(Into::into).collect(), - type_name: pb_column_desc.type_name.clone(), } } } @@ -236,6 +218,7 @@ impl Schema { } impl Field { + // TODO: rename to `new` pub fn with_name(data_type: DataType, name: S) -> Self where S: Into, @@ -243,25 +226,6 @@ impl Field { Self { data_type, name: name.into(), - sub_fields: vec![], - type_name: String::new(), - } - } - - pub fn with_struct( - data_type: DataType, - name: S, - sub_fields: Vec, - type_name: S, - ) -> Self - where - S: Into, - { - Self { - data_type, - name: name.into(), - sub_fields, - type_name: type_name.into(), } } @@ -269,8 +233,6 @@ impl Field { Self { data_type, name: String::new(), - sub_fields: vec![], - type_name: String::new(), } } @@ -282,8 +244,6 @@ impl Field { Self { data_type: desc.data_type.clone(), name: format!("{}.{}", table_name, desc.name), - sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(), - type_name: desc.type_name.clone(), } } } @@ -293,8 +253,6 @@ impl From<&PbField> for Field { Self { data_type: DataType::from(prost_field.get_data_type().expect("data type not found")), name: prost_field.get_name().clone(), - sub_fields: vec![], - type_name: String::new(), } } } diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 18d2b5694e5db..9b61f3e02e404 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -376,12 +376,13 @@ impl BigQuerySink { ))) } DataType::Struct(_) => { - let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len()); - for rw_field in &rw_field.sub_fields { - let field = Self::map_field(rw_field)?; - sub_fields.push(field) - } - TableFieldSchema::record(&rw_field.name, sub_fields) + todo!() + // let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len()); + // for rw_field in &rw_field.sub_fields { + // let field = Self::map_field(rw_field)?; + // sub_fields.push(field) + // } + // TableFieldSchema::record(&rw_field.name, sub_fields) } DataType::List(dt) => { let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?; diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 6a9c08330e58e..ea622c127cb99 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -1073,18 +1073,19 @@ pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result Agg { table_builder.add_column(&Field { data_type: DataType::Int64, name: format!("count_for_agg_call_{}", call_index), - sub_fields: vec![], - type_name: String::default(), + }); } table_builder diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs index 329bda8694fee..6422b5bd80834 100644 --- a/src/frontend/src/optimizer/plan_node/generic/now.rs +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -58,8 +58,7 @@ impl GenericPlanNode for Now { } else { "ts" }), - sub_fields: vec![], - type_name: String::default(), + }]) } diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index f807c4f579fb9..3bb3917cf346f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -82,31 +82,28 @@ impl GenericPlanNode for Project { .enumerate() .map(|(i, expr)| { // Get field info from o2i. - let (name, sub_fields, type_name) = match o2i.try_map(i) { + let name = match o2i.try_map(i) { Some(input_idx) => { - let mut field = input_schema.fields()[input_idx].clone(); if let Some(name) = self.field_names.get(&i) { - field.name.clone_from(name); + name.clone() + } else { + input_schema.fields()[input_idx].name.clone() } - (field.name, field.sub_fields, field.type_name) } None => match expr { - ExprImpl::InputRef(_) | ExprImpl::Literal(_) => ( - format!("{:?}", ExprDisplay { expr, input_schema }), - vec![], - String::new(), - ), + ExprImpl::InputRef(_) | ExprImpl::Literal(_) => { + format!("{:?}", ExprDisplay { expr, input_schema }) + } _ => { - let name = if let Some(name) = self.field_names.get(&i) { + if let Some(name) = self.field_names.get(&i) { name.clone() } else { format!("$expr{}", ctx.next_expr_display_id()) - }; - (name, vec![], String::new()) + } } }, }; - Field::with_struct(expr.return_type(), name, sub_fields, type_name) + Field::with_name(expr.return_type(), name) }) .collect(); Schema { fields } diff --git a/src/frontend/src/optimizer/plan_node/generic/project_set.rs b/src/frontend/src/optimizer/plan_node/generic/project_set.rs index e3ca5efc99dd0..c823ee5618431 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project_set.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project_set.rs @@ -71,18 +71,11 @@ impl GenericPlanNode for ProjectSet { fields.extend(self.select_list.iter().enumerate().map(|(idx, expr)| { let idx = idx + 1; // Get field info from o2i. - let (name, sub_fields, type_name) = match o2i.try_map(idx) { - Some(input_idx) => { - let field = input_schema.fields()[input_idx].clone(); - (field.name, field.sub_fields, field.type_name) - } - None => ( - format!("{:?}", ExprDisplay { expr, input_schema }), - vec![], - String::new(), - ), + let name = match o2i.try_map(idx) { + Some(input_idx) => input_schema.fields()[input_idx].name.clone(), + None => (format!("{:?}", ExprDisplay { expr, input_schema }),), }; - Field::with_struct(expr.return_type(), name, sub_fields, type_name) + Field::with_name(expr.return_type(), name) })); Schema { fields } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 2e5478a166552..741224097ef0a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -163,14 +163,12 @@ impl Source { let key = Field { data_type: DataType::Varchar, name: "partition_id".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), + }; let value = Field { data_type: DataType::Jsonb, name: "offset_info".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), + }; let ordered_col_idx = builder.add_column(&key); diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index acb9a23dac349..692abb5464b57 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -188,8 +188,7 @@ impl LogicalSource { &Field { name: "filename".to_owned(), data_type: DataType::Varchar, - sub_fields: vec![], - type_name: "".to_owned(), + }, 0, ), @@ -200,8 +199,7 @@ impl LogicalSource { &Field { name: "last_edit_time".to_owned(), data_type: DataType::Timestamptz, - sub_fields: vec![], - type_name: "".to_owned(), + }, 1, ), @@ -212,8 +210,7 @@ impl LogicalSource { &Field { name: "file_size".to_owned(), data_type: DataType::Int64, - sub_fields: vec![], - type_name: "".to_owned(), + }, 0, ), diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 6b9aa1ca43bef..f9115609f6443 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -382,7 +382,7 @@ impl PlanTreeNodeUnary for StreamMaterialize { .zip_eq_fast(self.base.schema().fields.iter()) .for_each(|(a, b)| { assert_eq!(a.data_type, b.data_type); - assert_eq!(a.type_name, b.type_name); + assert_eq!(a.type_nameeee, b.type_nameeee); assert_eq!(a.sub_fields, b.sub_fields); }); assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key()); diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 297dbae298142..98ac9814aca70 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -85,14 +85,12 @@ impl StreamSourceScan { let key = Field { data_type: DataType::Varchar, name: "partition_id".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), + }; let value = Field { data_type: DataType::Jsonb, name: "backfill_progress".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), + }; let ordered_col_idx = builder.add_column(&key); diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index 51fc0ed8d7f06..a091205c37936 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -122,14 +122,12 @@ pub fn infer_internal_table_catalog(watermark_type: DataType) -> TableCatalog { let key = Field { data_type: DataType::Int16, name: "vnode".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), + }; let value = Field { data_type: watermark_type, name: "offset".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), + }; let ordered_col_idx = builder.add_column(&key);