Skip to content

Commit

Permalink
refactor(types): remove type_name and sub_fields from Field
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Feb 17, 2025
1 parent bd2c2a9 commit e8ad2da
Show file tree
Hide file tree
Showing 19 changed files with 61 additions and 193 deletions.
8 changes: 2 additions & 6 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,8 @@ impl ColumnDesc {
data_type: field.data_type.clone(),
column_id: ColumnId::new(id),
name: field.name.clone(),
field_descs: field
.sub_fields
.iter()
.map(Self::from_field_without_column_id)
.collect_vec(),
type_name: field.type_name.clone(),
field_descs: Vec::new(), // TODO: deprecate this
type_name: String::new(), // TODO: deprecate this
description: None,
generated_or_default_column: None,
additional_column: AdditionalColumn { column_type: None },
Expand Down
44 changes: 1 addition & 43 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::ops::Index;

use itertools::Itertools;
use risingwave_pb::plan_common::{PbColumnDesc, PbField};

use super::ColumnDesc;
Expand All @@ -27,20 +26,13 @@ use crate::util::iter_util::ZipEqFast;
pub struct Field {
pub data_type: DataType,
pub name: String,
/// For STRUCT type.
pub sub_fields: Vec<Field>,
/// The user-defined type's name, when the type is created from a protobuf schema file,
/// this field will store the message name.
pub type_name: String,
}

impl Field {
pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}
}
Expand All @@ -65,8 +57,6 @@ impl From<&ColumnDesc> for Field {
Self {
data_type: desc.data_type.clone(),
name: desc.name.clone(),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}
Expand All @@ -76,12 +66,6 @@ impl From<ColumnDesc> for Field {
Self {
data_type: column_desc.data_type,
name: column_desc.name,
sub_fields: column_desc
.field_descs
.into_iter()
.map(Into::into)
.collect(),
type_name: column_desc.type_name,
}
}
}
Expand All @@ -91,8 +75,6 @@ impl From<&PbColumnDesc> for Field {
Self {
data_type: pb_column_desc.column_type.as_ref().unwrap().into(),
name: pb_column_desc.name.clone(),
sub_fields: pb_column_desc.field_descs.iter().map(Into::into).collect(),
type_name: pb_column_desc.type_name.clone(),
}
}
}
Expand Down Expand Up @@ -236,41 +218,21 @@ impl Schema {
}

impl Field {
// TODO: rename to `new`
pub fn with_name<S>(data_type: DataType, name: S) -> Self
where
S: Into<String>,
{
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}

pub fn with_struct<S>(
data_type: DataType,
name: S,
sub_fields: Vec<Field>,
type_name: S,
) -> Self
where
S: Into<String>,
{
Self {
data_type,
name: name.into(),
sub_fields,
type_name: type_name.into(),
}
}

pub fn unnamed(data_type: DataType) -> Self {
Self {
data_type,
name: String::new(),
sub_fields: vec![],
type_name: String::new(),
}
}

Expand All @@ -282,8 +244,6 @@ impl Field {
Self {
data_type: desc.data_type.clone(),
name: format!("{}.{}", table_name, desc.name),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}
Expand All @@ -293,8 +253,6 @@ impl From<&PbField> for Field {
Self {
data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
name: prost_field.get_name().clone(),
sub_fields: vec![],
type_name: String::new(),
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,13 @@ impl BigQuerySink {
)))
}
DataType::Struct(_) => {
let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len());
for rw_field in &rw_field.sub_fields {
let field = Self::map_field(rw_field)?;
sub_fields.push(field)
}
TableFieldSchema::record(&rw_field.name, sub_fields)
todo!()
// let mut sub_fields = Vec::with_capacity(rw_field.sub_fields.len());
// for rw_field in &rw_field.sub_fields {
// let field = Self::map_field(rw_field)?;
// sub_fields.push(field)
// }
// TableFieldSchema::record(&rw_field.name, sub_fields)
}
DataType::List(dt) => {
let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
Expand Down
25 changes: 13 additions & 12 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,18 +1073,19 @@ pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String
let mut vec = vec![];
for field in schema.fields() {
if matches!(field.data_type, DataType::Struct(_)) {
for i in &field.sub_fields {
if matches!(i.data_type, DataType::Struct(_)) {
return Err(SinkError::ClickHouse(
"Only one level of nesting is supported for struct".to_owned(),
));
} else {
vec.push((
format!("{}.{}", field.name, i.name),
DataType::List(Box::new(i.data_type())),
))
}
}
// for i in &field.sub_fields {
// if matches!(i.data_type, DataType::Struct(_)) {
// return Err(SinkError::ClickHouse(
// "Only one level of nesting is supported for struct".to_owned(),
// ));
// } else {
// vec.push((
// format!("{}.{}", field.name, i.name),
// DataType::List(Box::new(i.data_type())),
// ))
// }
// }
todo!()
} else {
vec.push((field.name.clone(), field.data_type()));
}
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,12 @@ mod test {
Field {
data_type: DataType::Int32,
name: "id".into(),
sub_fields: vec![],
type_name: "".into(),

},
Field {
data_type: DataType::Varchar,
name: "name".into(),
sub_fields: vec![],
type_name: "".into(),

},
]);

Expand Down
36 changes: 0 additions & 36 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,6 @@ mod tests {
let mock_field = Field {
data_type: DataType::Boolean,
name: Default::default(),
sub_fields: Default::default(),
type_name: Default::default(),
};

let config = JsonEncoderConfig {
Expand Down Expand Up @@ -724,8 +722,6 @@ mod tests {
let mock_field = Field {
data_type: DataType::Boolean,
name: Default::default(),
sub_fields: Default::default(),
type_name: Default::default(),
};
let fields = vec![
Field {
Expand Down Expand Up @@ -786,38 +782,6 @@ mod tests {
),
])),
name: "v10".into(),
sub_fields: vec![
Field {
data_type: DataType::Timestamp,
name: "a".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Timestamptz,
name: "b".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Struct(StructType::new(vec![
("aa", DataType::Int64),
("bb", DataType::Float64),
])),
name: "c".into(),
sub_fields: vec![
Field {
data_type: DataType::Int64,
name: "aa".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Float64,
name: "bb".into(),
..mock_field.clone()
},
],
..mock_field.clone()
},
],
..mock_field.clone()
},
Field {
Expand Down
19 changes: 0 additions & 19 deletions src/connector/src/sink/formatter/debezium_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,10 @@ mod tests {
Field {
data_type: DataType::Int32,
name: "v1".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Float32,
name: "v2".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: StructType::new(vec![
Expand All @@ -379,21 +375,6 @@ mod tests {
])
.into(),
name: "v3".into(),
sub_fields: vec![
Field {
data_type: DataType::Int32,
name: "v4".into(),
sub_fields: vec![],
type_name: "".into(),
},
Field {
data_type: DataType::Float32,
name: "v5".into(),
sub_fields: vec![],
type_name: "".into(),
},
],
type_name: "".into(),
},
]);

Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,14 +752,12 @@ mod test {
Field {
data_type: DataType::Int32,
name: "id".into(),
sub_fields: vec![],
type_name: "".into(),

},
Field {
data_type: DataType::Varchar,
name: "v2".into(),
sub_fields: vec![],
type_name: "".into(),

},
]);

Expand Down
12 changes: 4 additions & 8 deletions src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,12 @@ mod tests {
Field {
data_type: DataType::Int32,
name: "a".to_owned(),
sub_fields: vec![],
type_name: "".to_owned(),

},
Field {
data_type: DataType::Int32,
name: "b".to_owned(),
sub_fields: vec![],
type_name: "".to_owned(),

},
]);
let table_name = "test_table";
Expand All @@ -626,14 +624,12 @@ mod tests {
Field {
data_type: DataType::Int32,
name: "a".to_owned(),
sub_fields: vec![],
type_name: "".to_owned(),

},
Field {
data_type: DataType::Int32,
name: "b".to_owned(),
sub_fields: vec![],
type_name: "".to_owned(),

},
]);
let table_name = "test_table";
Expand Down
12 changes: 4 additions & 8 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,14 +507,12 @@ mod test {
Field {
data_type: DataType::Int32,
name: "id".to_owned(),
sub_fields: vec![],
type_name: "string".to_owned(),

},
Field {
data_type: DataType::Varchar,
name: "name".to_owned(),
sub_fields: vec![],
type_name: "string".to_owned(),

},
]);

Expand Down Expand Up @@ -580,14 +578,12 @@ mod test {
Field {
data_type: DataType::Int32,
name: "id".to_owned(),
sub_fields: vec![],
type_name: "string".to_owned(),

},
Field {
data_type: DataType::Varchar,
name: "name".to_owned(),
sub_fields: vec![],
type_name: "string".to_owned(),

},
]);

Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
table_builder.add_column(&Field {
data_type: DataType::Int64,
name: format!("count_for_agg_call_{}", call_index),
sub_fields: vec![],
type_name: String::default(),

});
}
table_builder
Expand Down
Loading

0 comments on commit e8ad2da

Please sign in to comment.