diff --git a/.github/workflows/presto-cdc-test.yml b/.github/workflows/presto-cdc-test.yml index aa747b4a0..3d6f6ba4e 100644 --- a/.github/workflows/presto-cdc-test.yml +++ b/.github/workflows/presto-cdc-test.yml @@ -143,7 +143,7 @@ jobs: python3 3_add_column.py python3 delete_data.py docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh - sleep 30s + sleep 60s - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark @@ -156,7 +156,7 @@ jobs: run: | cd ./script/benchmark python3 4_update_data.py - sleep 30s + sleep 60s - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark @@ -171,7 +171,7 @@ jobs: python3 6_drop_column.py python3 delete_data.py docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh - sleep 30s + sleep 60s - name: "[Check] Mysql cdc data accuracy verification task" run: | cd ./script/benchmark diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 0150ebaec..0b9a8d954 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -795,6 +795,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e366bff8cd32dd8754b0991fb66b279dc48f598c3a18914852a6673deef583" +dependencies = [ + "quote", + "syn 2.0.38", +] + [[package]] name = "dary_heap" version = "0.3.6" @@ -1530,17 +1540,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "lakesoul-datafusion" version = "0.1.0" dependencies = [ "async-trait", "chrono", + "ctor", "futures", + "json", "lakesoul-io", "lakesoul-metadata", "prost", "proto", + "serde", "uuid", ] diff --git a/rust/lakesoul-datafusion/Cargo.toml b/rust/lakesoul-datafusion/Cargo.toml index 3a5d91c62..4162a94db 100644 --- a/rust/lakesoul-datafusion/Cargo.toml +++ b/rust/lakesoul-datafusion/Cargo.toml @@ -17,4 +17,9 @@ prost = "0.11" async-trait = "0.1" futures = "0.3" uuid = { version = "1.4.0", features = ["v4", "fast-rng", "macro-diagnostics"]} -chrono = { version = "0.4", features = ["unstable-locales"] } \ No newline at end of file +chrono = { version = "0.4", features = ["unstable-locales"] } +json = "0.12" +serde = { version = "1.0", features = ["derive", "std", "rc"]} + +[dev-dependencies] +ctor = "0.2" \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/catalog/lakesoul_sink.rs b/rust/lakesoul-datafusion/src/catalog/lakesoul_sink.rs new file mode 100644 index 000000000..e5b740990 --- /dev/null +++ b/rust/lakesoul-datafusion/src/catalog/lakesoul_sink.rs @@ -0,0 +1,351 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + + +use std::any::Any; +use std::fmt::{self, Debug}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::SystemTime; + +use lakesoul_io::lakesoul_reader::ArrowError; +use lakesoul_io::{arrow, datafusion}; + +use arrow::array::{UInt64Array, ArrayRef}; +use arrow::record_batch::RecordBatch; +use arrow::datatypes::{Field, DataType}; +use async_trait::async_trait; + +use datafusion::common::{Statistics, DataFusionError}; + +use datafusion::datasource::TableProvider; +use datafusion::error::Result; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{ + Expr, TableType, +}; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_plan::insert::DataSink; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Distribution, stream::RecordBatchStreamAdapter +}; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; + +use futures::StreamExt; +use lakesoul_metadata::{MetaDataClient, MetaDataClientRef}; + +use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder}; +use lakesoul_io::lakesoul_writer::{MultiPartAsyncWriter, AsyncBatchWriter}; + +use super::{commit_data, create_io_config_builder}; + +#[derive(Debug, Clone)] +pub struct LakeSoulSinkProvider { + table_name: String, + schema: SchemaRef, + io_config: LakeSoulIOConfig, + postgres_config: Option, +} + +impl LakeSoulSinkProvider { + pub async fn new(table_name: &str) -> crate::error::Result { + Self::create_provider(table_name, Arc::new(MetaDataClient::from_env().await?), None).await + } + + pub async fn new_with_postgres_config(table_name: &str, postgres_config: &str) -> crate::error::Result { + Self::create_provider(table_name, Arc::new(MetaDataClient::from_config(postgres_config.to_string()).await?), Some(postgres_config.to_string())).await + } + + pub async fn create_provider(table_name: &str, client: MetaDataClientRef, postgres_config: Option) -> crate::error::Result { + let io_config = create_io_config_builder(client, Some(table_name)) + .await? + .build(); + let schema = io_config.schema(); + let table_name = table_name.to_string(); + Ok(Self { + table_name, + io_config, + schema, + postgres_config, + }) + } +} + +#[async_trait] +impl TableProvider for LakeSoulSinkProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + _projections: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let msg = "Scan not implemented for LakeSoulParquetSinkProvider".to_owned(); + Err(DataFusionError::NotImplemented(msg)) + } + + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + ) -> Result> { + let client = Arc::new( + if let Some(postgres_config) = &self.postgres_config { + match MetaDataClient::from_config(postgres_config.clone()).await { + Ok(client) => client, + Err(e) => return Err(DataFusionError::External(Box::new(e))) + } + } else { + match MetaDataClient::from_env().await { + Ok(client) => client, + Err(e) => return Err(DataFusionError::External(Box::new(e))) + } + } + ); + Ok(Arc::new( + LakeSoulParquetSinkExec::new( + input, + Arc::new( + LakeSoulParquekSink::new(client, self.table_name.clone(), self.io_config.clone().into()) + ) + ) + )) + } + +} + +#[derive(Debug, Clone)] +struct LakeSoulParquetSinkExec { + /// Input plan that produces the record batches to be written. + input: Arc, + /// Sink to which to write + sink: Arc, + /// Schema describing the structure of the data. + schema: SchemaRef, + +} + +impl LakeSoulParquetSinkExec { + fn new( + input: Arc, + sink: Arc, + ) -> Self { + Self { + input, + sink, + schema: Arc::new(Schema::empty()) + } + } +} + +impl DisplayAs for LakeSoulParquetSinkExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { + write!(f, "LakeSoulParquetSinkExec") + } +} + +impl ExecutionPlan for LakeSoulParquetSinkExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + + fn required_input_ordering(&self) -> Vec>> { + // Require that the InsertExec gets the data in the order the + // input produced it (otherwise the optimizer may chose to reorder + // the input which could result in unintended / poor UX) + // + // More rationale: + // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 + vec![self + .input + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs)] + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self { + input: children[0].clone(), + sink: self.sink.clone(), + schema: self.schema.clone(), + })) + } + + /// Execute the plan and return a stream of `RecordBatch`es for + /// the specified partition. + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if partition != 0 { + return Err(DataFusionError::Internal( + format!("Invalid requested partition {partition}. InsertExec requires a single input partition." + ))); + } + + // Execute each of our own input's partitions and pass them to the sink + let input_partition_count = self.input.output_partitioning().partition_count(); + if input_partition_count != 1 { + return Err(DataFusionError::Internal(format!( + "Invalid input partition count {input_partition_count}. \ + InsertExec needs only a single partition." + ))); + } + + let data = self.input.execute(0, context.clone())?; + let schema = self.schema.clone(); + let sink = self.sink.clone(); + + let stream = futures::stream::once(async move { + sink.write_all(data, &context).await.map(make_count_batch) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +/// Create a output record batch with a count +/// +/// ```text +/// +-------+, +/// | count |, +/// +-------+, +/// | 6 |, +/// +-------+, +/// ``` +fn make_count_batch(count: u64) -> RecordBatch { + let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + + RecordBatch::try_from_iter_with_nullable(vec![("row_count", array, false)]).unwrap() +} + +fn make_count_schema() -> SchemaRef { + // define a schema. + Arc::new(Schema::new(vec![Field::new( + "count", + DataType::UInt64, + false, + )])) +} + +/// Implements for writing to a [`MemTable`] +struct LakeSoulParquekSink { + table_name: String, + client: MetaDataClientRef, + config_builder: LakeSoulIOConfigBuilder, + schema: SchemaRef, +} + +impl Debug for LakeSoulParquekSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LakeSoulParquekSink") + .finish() + } +} + +impl DisplayAs for LakeSoulParquekSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MemoryTable (partitions=)") + } + } + } +} + +impl LakeSoulParquekSink { + fn new(client: MetaDataClientRef, table_name: String, config_builder: LakeSoulIOConfigBuilder) -> Self { + let schema = config_builder.schema(); + Self { client, table_name, config_builder, schema } + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[async_trait] +impl DataSink for LakeSoulParquekSink { + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { + let sink_schema = self.schema(); + if self.config_builder.primary_keys_slice().is_empty() { + if sink_schema != data.schema() { + return Err(DataFusionError::ArrowError(ArrowError::SchemaError(format!("Schema mismatch: to-write record_batch[{:?}] v.s. parquet writer[{:?}]", data.schema(), sink_schema)))) + } + } else if !data.schema().fields().iter().all(|f| sink_schema.field_with_name(f.name()).is_ok()) { + return Err(DataFusionError::ArrowError(ArrowError::SchemaError(format!("Extra column is present at to-write record_batch[{:?}], sink schema is {:?}", data.schema(), self.schema())))); + + } + let mut row_count = 0; + let table_name = self.table_name.as_str(); + let file = [std::env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); + let builder = self.config_builder.clone() + .with_files(vec![file]) + .with_schema(data.schema()); + let mut async_writer = MultiPartAsyncWriter::try_new(builder.clone().build()).await?; + while let Some(batch) = data.next().await.transpose()? { + row_count += batch.num_rows(); + async_writer.write_record_batch(batch).await?; + } + Box::new(async_writer).flush_and_close().await?; + + match commit_data(self.client.clone(), table_name, builder.build()).await { + Ok(_) => Ok(row_count as u64), + Err(e) => Err(DataFusionError::External(Box::new(e))) + } + } +} \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/catalog/lakesoul_source.rs b/rust/lakesoul-datafusion/src/catalog/lakesoul_source.rs new file mode 100644 index 000000000..b2ef04b8e --- /dev/null +++ b/rust/lakesoul-datafusion/src/catalog/lakesoul_source.rs @@ -0,0 +1,86 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{sync::Arc, any::Any}; + +use lakesoul_io::datafusion::logical_expr::TableProviderFilterPushDown; +use lakesoul_io::datafusion::physical_plan::ExecutionPlan; +use lakesoul_io::lakesoul_io_config::create_session_context; +use lakesoul_io::{datafusion, arrow}; + +use lakesoul_io::datasource::parquet_source::LakeSoulParquetProvider; +use lakesoul_metadata::{MetaDataClientRef, MetaDataClient}; + +use datafusion::error::Result; +use datafusion::execution::context::SessionState; +use datafusion::{datasource::TableProvider, logical_expr::TableType, prelude::Expr}; + +use arrow::datatypes::SchemaRef; + +use async_trait::async_trait; + + +use super::create_io_config_builder; + +#[derive(Debug, Clone)] +pub struct LakeSoulSourceProvider { + table_name: String, + inner: LakeSoulParquetProvider, + postgres_config: Option, +} + +impl LakeSoulSourceProvider { + pub async fn new(table_name: &str) -> crate::error::Result { + Self::create_provider(table_name, Arc::new(MetaDataClient::from_env().await?), None).await + } + + pub async fn new_with_postgres_config(table_name: &str, postgres_config: &str) -> crate::error::Result { + Self::create_provider(table_name, Arc::new(MetaDataClient::from_config(postgres_config.to_string()).await?), Some(postgres_config.to_string())).await + } + + pub async fn create_provider(table_name: &str, client: MetaDataClientRef, postgres_config: Option) -> crate::error::Result { + let io_config = create_io_config_builder(client, Some(table_name)) + .await? + .build(); + let context = create_session_context(&mut io_config.clone())?; + let inner = LakeSoulParquetProvider::from_config(io_config).build_with_context(&context).await?; + let table_name = table_name.to_string(); + Ok(Self { + table_name, + inner, + postgres_config, + }) + } +} + +#[async_trait] +impl TableProvider for LakeSoulSourceProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn table_type(&self) -> TableType { + self.inner.table_type() + } + + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + self.inner.supports_filters_pushdown(filters) + } + + async fn scan( + &self, + _state: &SessionState, + projections: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + self.inner.scan(_state, projections, _filters, _limit).await + } +} + diff --git a/rust/lakesoul-datafusion/src/catalog/mod.rs b/rust/lakesoul-datafusion/src/catalog/mod.rs new file mode 100644 index 000000000..f706ebda7 --- /dev/null +++ b/rust/lakesoul-datafusion/src/catalog/mod.rs @@ -0,0 +1,128 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::time::SystemTime; +use std::{env, path::PathBuf}; + +use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder}; +use lakesoul_metadata::MetaDataClientRef; +use lakesoul_io::serde_json; +use proto::proto::entity::{TableInfo, DataCommitInfo, DataFileOp, FileOp, CommitOp, Uuid, MetaInfo}; + + + +use crate::serialize::arrow_java::{ArrowJavaSchema, schema_from_metadata_str}; +// use crate::transaction::TransactionMetaInfo; +use crate::error::Result; + +pub mod lakesoul_sink; +pub mod lakesoul_source; + +pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, config: LakeSoulIOConfig) -> Result<()> { + client.create_table( + TableInfo { + table_id: format!("table_{}", uuid::Uuid::new_v4().to_string()), + table_name: table_name.to_string(), + table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::().to_str().unwrap().to_string(), + table_schema: serde_json::to_string::(&config.schema().into()).unwrap(), + table_namespace: "default".to_string(), + properties: "{}".to_string(), + partitions: ";".to_owned() + config.primary_keys_slice().iter().map(String::as_str).collect::>().join(",").as_str(), + domain: "public".to_string(), + }).await?; + Ok(()) +} + + +pub(crate) async fn create_io_config_builder(client: MetaDataClientRef, table_name: Option<&str>) -> lakesoul_metadata::error::Result { + if let Some(table_name) = table_name { + let table_info = client.get_table_info_by_table_name(table_name, "default").await?; + let data_files = client.get_data_files_by_table_name(table_name, vec![], "default").await?; + let schema_str = client.get_schema_by_table_name(table_name, "default").await?; + let schema = schema_from_metadata_str(schema_str.as_str()); + Ok(LakeSoulIOConfigBuilder::new() + .with_files(data_files) + .with_schema(schema) + .with_primary_keys( + parse_table_info_partitions(table_info.partitions).1 + )) + } else { + Ok(LakeSoulIOConfigBuilder::new()) + } +} + +pub(crate) fn parse_table_info_partitions(partitions: String) -> (Vec, Vec) { + let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap()); + let hash_keys = &hash_keys[1..]; + ( + range_keys.split(',') + .collect::>() + .iter() + .filter_map(|str| if str.is_empty() { + None + } else { + Some(str.to_string()) + }) + .collect::>(), + hash_keys.split(',') + .collect::>() + .iter() + .filter_map(|str| if str.is_empty() { + None + } else { + Some(str.to_string()) + }) + .collect::>() + ) +} + +pub(crate) async fn commit_data(client: MetaDataClientRef, table_name: &str, config: LakeSoulIOConfig) -> Result<()>{ + let table_name_id = client.get_table_name_id_by_table_name(table_name, "default").await?; + client.commit_data_commit_info(DataCommitInfo { + table_id: table_name_id.table_id, + partition_desc: "-5".to_string(), + file_ops: config.files_slice() + .iter() + .map(|file| DataFileOp { + file_op: FileOp::Add as i32, + path: file.clone(), + ..Default::default() + }) + .collect(), + commit_op: CommitOp::AppendCommit as i32, + timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64, + commit_id: { + let (high, low) = uuid::Uuid::new_v4().as_u64_pair(); + Some(Uuid{high, low}) + }, + ..Default::default() + }).await?; + Ok(()) +} + +// pub(crate) async fn commit_meta_info(meta_info: TransactionMetaInfo)->Result<()> { +// let mut table_info = meta_info.table_info.clone(); +// let mut json_map = json::parse(&table_info.properties).unwrap(); +// dbg!(&json_map); +// json_map.insert("hashBucketNum", "").unwrap(); +// table_info.properties = json_map.to_string(); + +// let info = MetaInfo { +// table_info: Some(table_info), +// list_partition: vec![], +// read_partition_info: vec![], +// }; + +// add_data_info(&info)?; +// let mut client = MetaDataClient::from_env().await?; +// client.commit_data(info, meta_info.commit_type).await?; +// update_table_schema(); +// Ok(()) +// } + +fn update_table_schema() {} + +pub(crate) fn add_data_info(meta_info: &MetaInfo)->Result<()> { + Ok(()) +} \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/error.rs b/rust/lakesoul-datafusion/src/error.rs index c3059b84f..c0c2e24b8 100644 --- a/rust/lakesoul-datafusion/src/error.rs +++ b/rust/lakesoul-datafusion/src/error.rs @@ -4,7 +4,7 @@ use std::{error::Error, sync::Arc, result, fmt::Display}; -use lakesoul_io::lakesoul_reader::DataFusionError; +use lakesoul_io::lakesoul_reader::{DataFusionError, ArrowError}; use lakesoul_metadata::error::LakeSoulMetaDataError; @@ -21,6 +21,9 @@ pub type GenericError = Box; pub enum LakeSoulError { MetaDataError(LakeSoulMetaDataError), DataFusionError(DataFusionError), + ArrowError(ArrowError), + JsonError(json::Error), + Internal(String), } impl From for LakeSoulError { @@ -35,11 +38,23 @@ impl From for LakeSoulError { } } +impl From for LakeSoulError { + fn from(err: ArrowError) -> Self { + Self::ArrowError(err) + } +} + impl Display for LakeSoulError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { LakeSoulError::MetaDataError(ref desc) => write!(f, "metadata error: {desc}"), LakeSoulError::DataFusionError(ref desc) => write!(f, "DataFusion error: {desc}"), + LakeSoulError::JsonError(ref desc) => write!(f, "json error: {desc}"), + LakeSoulError::ArrowError(ref desc) => write!(f, "arrow error: {desc}"), + LakeSoulError::Internal(ref desc) => { + write!(f, "Internal error: {desc}.\nThis was likely caused by a bug in LakeSoul's \ + code and we would welcome that you file an bug report in our issue tracker") + } } } } @@ -49,6 +64,9 @@ impl Error for LakeSoulError { match self { LakeSoulError::MetaDataError(e) => Some(e), LakeSoulError::DataFusionError(e) => Some(e), + LakeSoulError::JsonError(e) => Some(e), + LakeSoulError::ArrowError(e) => Some(e), + LakeSoulError::Internal(_) => None, } } } \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/lib.rs b/rust/lakesoul-datafusion/src/lib.rs index 2f9e599c0..9306f13c5 100644 --- a/rust/lakesoul-datafusion/src/lib.rs +++ b/rust/lakesoul-datafusion/src/lib.rs @@ -4,5 +4,6 @@ #[cfg(test)] mod test; - -mod error; \ No newline at end of file +mod catalog; +mod error; +mod serialize; \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/serialize/arrow_java.rs b/rust/lakesoul-datafusion/src/serialize/arrow_java.rs new file mode 100644 index 000000000..480568f7e --- /dev/null +++ b/rust/lakesoul-datafusion/src/serialize/arrow_java.rs @@ -0,0 +1,329 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, sync::Arc}; + +use lakesoul_io::arrow::datatypes::{FieldRef, Field, Fields, DataType, TimeUnit, SchemaRef, Schema}; +use lakesoul_io::serde_json; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(tag = "name")] +enum ArrowJavaType { + #[serde(rename = "null")] + Null, + #[serde(rename = "struct")] + Struct, + #[serde(rename = "list")] + List, + #[serde(rename = "largelist")] + LargeList, + #[serde(rename = "fixedsizelist")] + FixedSizeList { + #[serde(rename = "listSize")] + list_size: i32 + }, + #[serde(rename = "union")] + Union, + #[serde(rename = "map")] + Map { + #[serde(rename = "keysSorted")] + keys_sorted: bool + }, + #[serde(rename = "int")] + Int { + #[serde(rename = "isSigned")] + is_signed: bool, + #[serde(rename = "bitWidth")] + bit_width: i32, + }, + #[serde(rename = "floatingpoint")] + FloatingPoint { + precision: String, + }, + #[serde(rename = "utf8")] + Utf8, + #[serde(rename = "largeutf8")] + LargeUtf8, + #[serde(rename = "binary")] + Binary, + #[serde(rename = "largebinary")] + LargeBinary, + #[serde(rename = "fixedsizebinary")] + FixedSizeBinary { + #[serde(rename = "bitWidth")] + bit_width: i32, + }, + #[serde(rename = "bool")] + Bool, + #[serde(rename = "decimal")] + Decimal { + precision: u8, + scale: i8, + #[serde(rename = "bitWidth")] + bit_width: i32, + }, + #[serde(rename = "date")] + Date { + unit: String, + }, + #[serde(rename = "time")] + Time { + #[serde(rename = "bitWidth")] + bit_width: i32, + unit: String, + }, + #[serde(rename = "timestamp")] + Timestamp { + unit: String, + timezone: Option, + }, + #[serde(rename = "interval")] + Interval, + #[serde(rename = "duration")] + Duration, +} + + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct ArrowJavaField{ + name: String, + #[serde(rename = "type")] + data_type: ArrowJavaType, + nullable: bool, + children: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ArrowJavaSchema { + fields: Vec, + /// A map of key-value pairs containing additional meta data. + metadata: Option>, +} + +impl From<&FieldRef> for ArrowJavaField { + fn from(field: &FieldRef) -> Self { + let name = field.name().clone(); + let (data_type, children) = match field.data_type() { + DataType::Null => (ArrowJavaType::Null, vec![]), + + DataType::Struct(fields) => (ArrowJavaType::Struct, fields.iter().map(ArrowJavaField::from).collect::>()), + + DataType::List(field) => (ArrowJavaType::List, vec![ArrowJavaField::from(field)]), + DataType::LargeList(field) => (ArrowJavaType::LargeList, vec![ArrowJavaField::from(field)]), + DataType::FixedSizeList(field, list_size ) => (ArrowJavaType::FixedSizeList { list_size: *list_size }, vec![ArrowJavaField::from(field)]), + + + DataType::Map(struct_field, key_sorted) => (ArrowJavaType::Map { keys_sorted: *key_sorted }, vec![ArrowJavaField::from(struct_field)]), + + DataType::Int8 => (ArrowJavaType::Int { is_signed: true, bit_width: 8 }, vec![]), + DataType::Int16 => (ArrowJavaType::Int { is_signed: true, bit_width: 16 }, vec![]), + DataType::Int32 => (ArrowJavaType::Int { is_signed: true, bit_width: 32 }, vec![]), + DataType::Int64 => (ArrowJavaType::Int { is_signed: true, bit_width: 64 }, vec![]), + DataType::UInt8 => (ArrowJavaType::Int { is_signed: false, bit_width: 8 }, vec![]), + DataType::UInt16 => (ArrowJavaType::Int { is_signed: false, bit_width: 16 }, vec![]), + DataType::UInt32 => (ArrowJavaType::Int { is_signed: false, bit_width: 32 }, vec![]), + DataType::UInt64 => (ArrowJavaType::Int { is_signed: false, bit_width: 64 }, vec![]), + + DataType::Float16 => (ArrowJavaType::FloatingPoint { precision: "HALF".to_string() }, vec![]), + DataType::Float32 => (ArrowJavaType::FloatingPoint { precision: "SINGLE".to_string() }, vec![]), + DataType::Float64 => (ArrowJavaType::FloatingPoint { precision: "DOUBLE".to_string() }, vec![]), + + DataType::Utf8 => (ArrowJavaType::Utf8, vec![]), + DataType::LargeUtf8 => (ArrowJavaType::LargeUtf8, vec![]), + + DataType::Binary => (ArrowJavaType::Binary, vec![]), + DataType::LargeBinary => (ArrowJavaType::LargeBinary, vec![]), + DataType::FixedSizeBinary(bit_width) => (ArrowJavaType::FixedSizeBinary { bit_width: *bit_width }, vec![]), + + DataType::Boolean => (ArrowJavaType::Bool, vec![]), + + DataType::Decimal128(precision, scale) => (ArrowJavaType::Decimal { precision: *precision, scale: *scale, bit_width: 128 }, vec![]), + DataType::Decimal256(precision, scale) => (ArrowJavaType::Decimal { precision: *precision, scale: *scale, bit_width: 256 }, vec![]), + + DataType::Date32 => (ArrowJavaType::Date { unit: "DAY".to_string() }, vec![]), + DataType::Date64 => (ArrowJavaType::Date { unit: "MILLISECOND".to_string() }, vec![]), + + DataType::Time32(unit) => + ( + ArrowJavaType::Time { + bit_width: 32, + unit: match unit { + TimeUnit::Second => "SECOND".to_string(), + TimeUnit::Microsecond => "MICROSECOND".to_string(), + TimeUnit::Millisecond => "MILLISECOND".to_string(), + TimeUnit::Nanosecond => "NANOSECOND".to_string(), + } }, + vec![] + ), + DataType::Time64(unit) => + ( + ArrowJavaType::Time { + bit_width: 64, + unit: match unit { + TimeUnit::Second => "SECOND".to_string(), + TimeUnit::Microsecond => "MICROSECOND".to_string(), + TimeUnit::Millisecond => "MILLISECOND".to_string(), + TimeUnit::Nanosecond => "NANOSECOND".to_string(), + } }, + vec![] + ), + DataType::Timestamp(unit, timezone) => + (ArrowJavaType::Timestamp { + unit: match unit { + TimeUnit::Second => "SECOND".to_string(), + TimeUnit::Microsecond => "MICROSECOND".to_string(), + TimeUnit::Millisecond => "MILLISECOND".to_string(), + TimeUnit::Nanosecond => "NANOSECOND".to_string(), + }, + timezone: timezone.as_ref().map(|s| s.to_string()) + }, + vec![] + ), + + DataType::Union(_, _) => todo!("Union type not supported"), + DataType::Dictionary(_, _) => todo!("Dictionary type not supported"), + DataType::Duration(_) => todo!("Duration type not supported"), + DataType::Interval(_) => todo!("Interval type not supported"), + DataType::RunEndEncoded(_, _) => todo!("RunEndEncoded type not supported"), + }; + let nullable = field.is_nullable(); + ArrowJavaField { + name, + data_type, + nullable, + children + } + } +} + +impl From<&ArrowJavaField> for Field { + fn from(field: &ArrowJavaField) -> Field { + let java_type = &field.data_type.clone(); + let data_type = match java_type { + ArrowJavaType::Null => DataType::Null, + ArrowJavaType::Struct => + DataType::Struct( + Fields::from(field + .children + .iter() + .map(|f| f.into()) + .collect::>() + ) + ), + ArrowJavaType::List => { + assert!(field.children.len() == 1); + DataType::List(Arc::new(field.children.first().unwrap().into())) + } + ArrowJavaType::LargeList => { + assert!(field.children.len() == 1); + DataType::LargeList(Arc::new(field.children.first().unwrap().into())) + } + ArrowJavaType::FixedSizeList { list_size } => { + assert!(field.children.len() == 1); + DataType::FixedSizeList(Arc::new(field.children.first().unwrap().into()), *list_size) + } + ArrowJavaType::Union => todo!("Union type not supported"), + ArrowJavaType::Map{keys_sorted} => { + assert!(field.children.len() == 1); + DataType::Map(Arc::new(field.children.first().unwrap().into()), *keys_sorted) + } + ArrowJavaType::Int { is_signed, bit_width } => { + if *is_signed { + match bit_width { + 8 => DataType::Int8, + 16 => DataType::Int16, + 32 => DataType::Int32, + 64 => DataType::Int64, + other => panic!("Int has an invalid bit_width = {}", other), + } + } else { + match bit_width { + 8 => DataType::UInt8, + 16 => DataType::UInt16, + 32 => DataType::UInt32, + 64 => DataType::UInt64, + other => panic!("Int has an invalid bit_width = {}", other), + } + } + } + ArrowJavaType::FloatingPoint { precision } => + match precision.as_str() { + "HALF" => DataType::Float16, + "SINGLE" => DataType::Float32, + "DOUBLE" => DataType::Float64, + other => panic!("FloatingPoint has an invalid precision = {}", other), + } + ArrowJavaType::Utf8 => DataType::Utf8, + ArrowJavaType::LargeUtf8 => DataType::LargeUtf8, + ArrowJavaType::Binary => DataType::Binary, + ArrowJavaType::LargeBinary => DataType::LargeBinary, + ArrowJavaType::FixedSizeBinary { bit_width } => DataType::FixedSizeBinary(*bit_width), + ArrowJavaType::Bool => DataType::Boolean, + ArrowJavaType::Decimal { precision, scale, bit_width } if *bit_width > 128 => DataType::Decimal256(*precision, *scale), + ArrowJavaType::Decimal { precision, scale, bit_width } => DataType::Decimal128(*precision, *scale), + ArrowJavaType::Date { unit } if unit == "DAY" => DataType::Date32, + ArrowJavaType::Date { unit } => DataType::Date64, + ArrowJavaType::Time { bit_width, unit } => { + let time_unit = match unit.as_str() { + "SECOND" => TimeUnit::Second, + "MILLISECOND" => TimeUnit::Millisecond, + "MICROSECOND" => TimeUnit::Microsecond, + "NANOSECOND" => TimeUnit::Nanosecond, + other => panic!("TimeUnit has an invalid value = {}", other), + }; + match bit_width { + 32 => DataType::Time32(time_unit), + 64 => DataType::Time64(time_unit), + other => panic!("Time has an invalid bit_width = {}", other), + } + } + ArrowJavaType::Timestamp { unit, timezone } => { + let time_unit = match unit.as_str() { + "SECOND" => TimeUnit::Second, + "MILLISECOND" => TimeUnit::Millisecond, + "MICROSECOND" => TimeUnit::Microsecond, + "NANOSECOND" => TimeUnit::Nanosecond, + other => panic!("TimeUnit has an invalid value = {}", other), + }; + let timezone: Option> = timezone.as_ref().map(|t| Arc::from(t.as_str())); + DataType::Timestamp(time_unit, timezone) + }, + ArrowJavaType::Interval => todo!("Interval type not supported"), + ArrowJavaType::Duration => todo!("Duration type not supported"), + }; + Field::new( + field.name.clone(), + data_type, + field.nullable + ) + } +} + +impl From for ArrowJavaSchema { + fn from(schema: SchemaRef) -> Self { + Self { + fields: schema.fields().iter().map(ArrowJavaField::from).collect::>(), + metadata: None + } + } +} + +impl From for SchemaRef { + fn from(schema: ArrowJavaSchema) -> Self { + SchemaRef::new(Schema::new( + schema.fields + .iter() + .map(|f| f.into()) + .collect::>() + )) + } +} + + +pub fn schema_from_metadata_str(s: &str) -> SchemaRef { + serde_json::from_str::(s).map_or_else( |_| { + let java_schema = serde_json::from_str::(s).unwrap(); + java_schema.into() + }, SchemaRef::new) +} diff --git a/rust/lakesoul-datafusion/src/serialize/mod.rs b/rust/lakesoul-datafusion/src/serialize/mod.rs new file mode 100644 index 000000000..cd51f820c --- /dev/null +++ b/rust/lakesoul-datafusion/src/serialize/mod.rs @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +pub mod arrow_java; \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/test/insert_tests.rs b/rust/lakesoul-datafusion/src/test/insert_tests.rs new file mode 100644 index 000000000..ddd3a0877 --- /dev/null +++ b/rust/lakesoul-datafusion/src/test/insert_tests.rs @@ -0,0 +1,514 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +mod insert_tests { + use std::ops::Deref; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::SystemTime; + + use lakesoul_io::arrow::array::*; + use lakesoul_io::arrow::datatypes::{Int32Type, i256}; + use lakesoul_io::arrow::util::pretty::print_batches; + use lakesoul_io::datafusion::dataframe; + use lakesoul_io::datafusion::logical_expr::{LogicalPlan, DmlStatement, LogicalPlanBuilder}; + use lakesoul_io::datafusion::prelude::DataFrame; + use lakesoul_io::datasource::parquet_source::LakeSoulParquetProvider; + use lakesoul_io::filter::parser::Parser; + use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader}; + use lakesoul_io::{arrow, datafusion, tokio}; + use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfigBuilder, create_session_context}; + use datafusion::assert_batches_eq; + use tokio::runtime::{Runtime, Builder}; + use arrow::{record_batch::RecordBatch, + array::{Int32Array, ArrayRef}, + datatypes::{SchemaRef, Schema, Field, DataType} + }; + use lakesoul_metadata::{MetaDataClient, MetaDataClientRef}; + + use crate::catalog::lakesoul_source::LakeSoulSourceProvider; + use crate::{error::Result, catalog::{create_table, create_io_config_builder}}; + use crate::catalog::lakesoul_sink::LakeSoulSinkProvider; + + async fn init_table(client: MetaDataClientRef, schema: SchemaRef, table_name: &str) -> Result<()> { + let builder = LakeSoulIOConfigBuilder::new() + .with_schema(schema.clone()); + // .with_primary_keys(pks); + create_table(client, table_name, builder.build()).await + } + + async fn init_partitioned_table(client: MetaDataClientRef, schema: SchemaRef, table_name: &str, partition_key: Vec<&str>) -> Result<()> { + // todo: partitioned table is replaced by primary key table currently + let builder = LakeSoulIOConfigBuilder::new() + .with_schema(schema.clone()) + .with_primary_keys(partition_key.into_iter().map(String::from).collect()); + create_table(client, table_name, builder.build()).await + } + + + async fn do_insert(client: MetaDataClientRef, record_batch: RecordBatch, table_name: &str) -> Result<()> { + let builder = create_io_config_builder(client, None).await?; + let sess_ctx = create_session_context(&mut builder.clone().build())?; + + let provider = LakeSoulSinkProvider::new(table_name).await?; + sess_ctx.register_table(table_name, Arc::new(provider)).unwrap(); + + let num_rows = record_batch.num_rows(); + let schema = record_batch.schema(); + let logical_plan = LogicalPlanBuilder::insert_into( + sess_ctx.read_batch(record_batch)?.into_unoptimized_plan(), + table_name.to_string(), + &schema.deref())? + .build()?; + let dataframe = DataFrame::new(sess_ctx.state(), logical_plan); + + let results = dataframe + // .explain(true, false)? + .collect() + .await?; + + assert_batches_eq!(&[ + "+-----------+", + "| row_count |", + "+-----------+", + format!("| {:<10}|", num_rows).as_str(), + "+-----------+", + ], &results); + Ok(()) + } + + async fn check_insert( + client: MetaDataClientRef, + table_name: &str, + selected_cols: Vec<&str>, + filters: Option, + expected: &[&str] + ) -> Result<()> { + let builder = create_io_config_builder(client, None).await?; + let sess_ctx = create_session_context(&mut builder.clone().build())?; + + let provider = LakeSoulSourceProvider::new(table_name).await?; + let dataframe = sess_ctx.read_table(Arc::new(provider))?; + let schema = SchemaRef::new(dataframe.schema().into()); + + let dataframe = if let Some(f) = filters { + dataframe.filter(Parser::parse(f.clone(), schema))? + } else { + dataframe + }; + + let dataframe = if selected_cols.is_empty() { + dataframe + } else { + dataframe.select_columns(&selected_cols)? + }; + + + + let result = dataframe + .collect() + .await?; + + assert_batches_eq!(expected, &result); + Ok(()) + } + + fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { + let values = values + .into_iter() + .map(|vec| Arc::new(Int32Array::from(Vec::from(vec))) as ArrayRef) + .collect::>(); + let iter = names.into_iter().zip(values).map(|(name, array)| (name, array, true)).collect::>(); + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + + async fn test_insert_into_append() -> Result<()> { + let table_name = "test_insert_into_append"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_table( + client.clone(), + record_batch.schema(), + table_name, + ).await?; + do_insert( + client.clone(), + record_batch, + table_name,).await?; + check_insert( + client.clone(), + table_name, + vec!["id", "data"], + None, + &[ + "+----+------+", + "| id | data |", + "+----+------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 3 |", + "+----+------+", + ], + ).await + } + + async fn test_insert_into_append_by_position() -> Result<()> { + let table_name = "test_insert_into_append_by_position"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_table( + client.clone(), + record_batch.schema(), + table_name, + ).await?; + do_insert( + client.clone(), + record_batch, + table_name,).await?; + check_insert( + client.clone(), + table_name, + vec!["data", "id"], + None, + &[ + "+------+----+", + "| data | id |", + "+------+----+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 3 |", + "+------+----+", + ], + ).await + } + + async fn test_insert_into_append_partitioned_table() -> Result<()> { + let table_name = "test_insert_into_append_partitioned_table"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_partitioned_table( + client.clone(), + record_batch.schema(), + table_name, + vec!["id"] + ).await?; + do_insert( + client.clone(), + record_batch, + table_name,).await?; + check_insert( + client.clone(), + table_name, + vec!["data", "id"], + None, + &[ + "+------+----+", + "| data | id |", + "+------+----+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 3 |", + "+------+----+", + ], + ).await + } + + async fn test_insert_into_append_non_partitioned_table_and_read_with_filter() -> Result<()> { + let table_name = "test_insert_into_append_non_partitioned_table_and_read_with_filter"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_table( + client.clone(), + record_batch.schema(), + table_name, + ).await?; + do_insert( + client.clone(), + record_batch, + table_name,).await?; + check_insert( + client.clone(), + table_name, + vec!["data", "id"], + Some("and(noteq(id, null), lteq(id, 2))".to_string()), + &[ + "+------+----+", + "| data | id |", + "+------+----+", + "| 1 | 1 |", + "| 2 | 2 |", + "+------+----+", + ], + ).await + } + + async fn test_insert_into_append_partitioned_table_and_read_with_partition_filter() -> Result<()> { + let table_name = "test_insert_into_append_partitioned_table_and_read_with_partition_filter"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_partitioned_table( + client.clone(), + record_batch.schema(), + table_name, + vec!["id"] + ).await?; + do_insert( + client.clone(), + record_batch, + table_name,).await?; + check_insert( + client.clone(), + table_name, + vec!["data", "id"], + Some("and(noteq(id, null), lteq(id, 2))".to_string()), + &[ + "+------+----+", + "| data | id |", + "+------+----+", + "| 1 | 1 |", + "| 2 | 2 |", + "+------+----+", + ], + ).await + } + + // todo: insert_overwrite is not supported by datafusion 27.0 + // #[tokio::test] + async fn test_insert_into_overwrite_non_partitioned_table() -> Result<()> { + let table_name = "test_insert_into_overwrite_non_partitioned_table"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_table( + client.clone(), + record_batch.schema(), + table_name, + ).await?; + do_insert( + client.clone(), + record_batch, + table_name,).await?; + // todo: should do_insert_overwrite + do_insert( + client.clone(), + create_batch_i32(vec!["id", "data"], vec![&[4, 5, 6], &[4, 5, 6]]), + table_name,).await?; + check_insert( + client.clone(), + table_name, + vec!["id", "data"], + None, + &[ + "+----+------+", + "| id | data |", + "+----+------+", + "| 4 | 4 |", + "| 5 | 5 |", + "| 6 | 6 |", + "+----+------+", + ], + ).await + } + + async fn test_insert_into_fails_when_missing_a_column() -> Result<()> { + let table_name = "test_insert_into_fails_when_missing_a_column"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = create_batch_i32(vec!["id", "data"], vec![&[1, 2, 3], &[1, 2, 3]]); + init_table( + client.clone(), + SchemaRef::new(Schema::new(["id", "data", "missing"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), + table_name, + ).await?; + match do_insert( + client.clone(), + record_batch, + table_name).await { + Err(e) => { + Ok(()) + } + Ok(()) => Err(crate::error::LakeSoulError::Internal("InsertInto should fail when missing columns".to_string())) + } + } + + async fn test_insert_into_fails_when_an_extra_column_is_present_but_can_evolve_schema() -> Result<()> { + let table_name = "test_insert_into_fails_when_an_extra_column_is_present_but_can_evolve_schema"; + let client = Arc::new(MetaDataClient::from_env().await?); + let record_batch = RecordBatch::try_from_iter_with_nullable(vec![ + ("id", Arc::new(Int32Array::from(vec![1])) as ArrayRef, true), + ("data", Arc::new(StringArray::from(vec!["a"])) as ArrayRef, true), + ("fruit", Arc::new(StringArray::from(vec!["mango"])) as ArrayRef, true), + ]).unwrap(); + init_table( + client.clone(), + SchemaRef::new(Schema::new(["id", "data"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), + table_name, + ).await?; + match do_insert( + client.clone(), + record_batch, + table_name).await { + Err(e) => { + Ok(()) + } + Ok(()) => Err(crate::error::LakeSoulError::Internal("InsertInto should fails when an extra column is present but can evolve schema".to_string())) + } + // todo: pass this case when SCHEMA_AUTO_MIGRATE is true + } + + async fn test_datatypes() -> Result<()>{ + let table_name = "test_datatypes"; + let client = Arc::new(MetaDataClient::from_env().await?); + // let mut client = MetaDataClient::from_config("host=127.0.0.1 port=5433 dbname=test_lakesoul_meta user=yugabyte password=yugabyte".to_string()); + + let iter = vec![ + ("Boolean", Arc::new(BooleanArray::from(vec![true, false])) as ArrayRef, true), + ("Binary", Arc::new(BinaryArray::from_vec(vec![&[1u8], &[2u8, 3u8]])) as ArrayRef, true), + + ("Date32", Arc::new(Date32Array::from(vec![1, -2])) as ArrayRef, true), + ("Date64", Arc::new(Date64Array::from(vec![1, -2])) as ArrayRef, true), + ("Decimal128", Arc::new(Decimal128Array::from(vec![1, -2])) as ArrayRef, true), + ("Decimal256", Arc::new(Decimal256Array::from(vec![Some(i256::default()), None])) as ArrayRef, true), + + // ParquetError(ArrowError("Converting Duration to parquet not supported")) + // ("DurationMicrosecond", Arc::new(DurationMicrosecondArray::from(vec![1])) as ArrayRef, true), + // ("DurationMillisecond", Arc::new(DurationMillisecondArray::from(vec![1])) as ArrayRef, true), + + // ("Float16", Arc::new(Float16Array::from(vec![1.0])) as ArrayRef, true), + + ("FixedSizeBinary", Arc::new(FixedSizeBinaryArray::from(vec![&[1u8][..], &[2u8][..]])) as ArrayRef, true), + ("FixedSizeList", Arc::new(FixedSizeListArray::from_iter_primitive::(vec![ + Some(vec![Some(0), Some(1), Some(2)]), + None, + // Some(vec![Some(3), None, Some(5)]), + // Some(vec![Some(6), Some(7)]), + ], 3)) as ArrayRef, true), + + ("Float32", Arc::new(Float32Array::from(vec![1.0, -1.0])) as ArrayRef, true), + ("Float64", Arc::new(Float64Array::from(vec![1.0, -1.0])) as ArrayRef, true), + + + ("Int8", Arc::new(Int8Array::from(vec![1i8, -2i8])) as ArrayRef, true), + // ("Int8Dictionary", Arc::new(Int8DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ("Int16", Arc::new(Int16Array::from(vec![1i16, -2i16])) as ArrayRef, true), + // ("Int16Dictionary", Arc::new(Int16DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ("Int32", Arc::new(Int32Array::from(vec![1i32, -2i32])) as ArrayRef, true), + // ("Int32Dictionary", Arc::new(Int32DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ("Int64", Arc::new(Int64Array::from(vec![1i64, -2i64])) as ArrayRef, true), + // ("Int64Dictionary", Arc::new(Int64DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + + // ("IntervalDayTime", Arc::new(IntervalDayTimeArray::from(vec![1, 2])) as ArrayRef, true), + // ParquetError(NYI("Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented")) + //("IntervalMonthDayNano", Arc::new(IntervalMonthDayNanoArray::from(vec![1])) as ArrayRef, true), + // ("IntervalYearMonth", Arc::new(IntervalYearMonthArray::from(vec![1, 2])) as ArrayRef, true), + + ("Map", Arc::new({ + let string_builder = StringBuilder::new(); + let int_builder = Int32Builder::with_capacity(4); + + // Construct `[{"joe": 1}, {"blogs": 2, "foo": 4}]` + let mut builder = MapBuilder::new(None, string_builder, int_builder); + + builder.keys().append_value("joe"); + builder.values().append_value(1); + builder.append(true).unwrap(); + + builder.keys().append_value("blogs"); + builder.values().append_value(2); + builder.keys().append_value("foo"); + builder.values().append_value(4); + builder.append(true).unwrap(); + + builder.finish() + }) as ArrayRef, true), + + ("Null", Arc::new(NullArray::new(2)) as ArrayRef, true), + + ("LargeBinary", Arc::new(LargeBinaryArray::from_vec(vec![&[1u8], &[2u8, 3u8]])) as ArrayRef, true), + ("LargeString", Arc::new(LargeStringArray::from(vec!["1", ""])) as ArrayRef, true), + + ("List", Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(0), Some(1), Some(2)]), + None, + // Some(vec![Some(3), None, Some(5)]), + // Some(vec![Some(6), Some(7)]), + ])) as ArrayRef, true), + + // ParquetError(ArrowError("Converting RunEndEncodedType to parquet not supported")) + // ("Run", Arc::new(RunArray::::from_iter([Some("a"), None])) as ArrayRef, true), + + ("String", Arc::new(StringArray::from(vec!["1", ""])) as ArrayRef, true), + ("Struct", Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![false, true])) as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![42, 31])) as ArrayRef, + ), + ])) as ArrayRef, true), + + ("Time32Millisecond", Arc::new(Time32MillisecondArray::from(vec![1i32, -2i32])) as ArrayRef, true), + ("Time32Second", Arc::new(Time32SecondArray::from(vec![1i32, -2i32])) as ArrayRef, true), + ("Time64Microsecond", Arc::new(Time64MicrosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), + ("Time64Nanosecond", Arc::new(Time64NanosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), + ("TimestampMicrosecond", Arc::new(TimestampMicrosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), + ("TimestampMillisecond", Arc::new(TimestampMillisecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), + ("TimestampNanosecond", Arc::new(TimestampNanosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), + ("TimestampSecond", Arc::new(TimestampSecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), + + ("UInt8", Arc::new(UInt8Array::from(vec![1u8, 2u8])) as ArrayRef, true), + // ("UInt8Dictionary", Arc::new(UInt8DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ("UInt16", Arc::new(UInt16Array::from(vec![1u16, 2u16])) as ArrayRef, true), + // ("UInt16Dictionary", Arc::new(UInt16DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ("UInt32", Arc::new(UInt32Array::from(vec![1u32, 2u32])) as ArrayRef, true), + // ("UInt32Dictionary", Arc::new(UInt32DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ("UInt64", Arc::new(UInt64Array::from(vec![1u64, 2u64])) as ArrayRef, true), + // ("UInt64Dictionary", Arc::new(UInt64DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), + ]; + let record_batch = RecordBatch::try_from_iter_with_nullable(iter).unwrap(); + init_table( + client.clone(), + record_batch.schema(), + table_name, + ).await?; + do_insert(client.clone(), record_batch, table_name).await?; + check_insert(client.clone(), table_name, vec![], None, &[ + "+---------+--------+------------+---------------------+---------------+--------------+-----------------+---------------+---------+---------+------+-------+-------+-------+--------------------+------+-------------+-------------+-----------+--------+-------------------+-----------------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------------+----------------------------------------------------------------------------+----------------------------+-------------------------+-------------------------------+---------------------+-------+--------+--------+--------+", + "| Boolean | Binary | Date32 | Date64 | Decimal128 | Decimal256 | FixedSizeBinary | FixedSizeList | Float32 | Float64 | Int8 | Int16 | Int32 | Int64 | Map | Null | LargeBinary | LargeString | List | String | Struct | Time32Millisecond | Time32Second | Time64Microsecond | Time64Nanosecond | TimestampMicrosecond | TimestampMillisecond | TimestampNanosecond | TimestampSecond | UInt8 | UInt16 | UInt32 | UInt64 |", + "+---------+--------+------------+---------------------+---------------+--------------+-----------------+---------------+---------+---------+------+-------+-------+-------+--------------------+------+-------------+-------------+-----------+--------+-------------------+-----------------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------------+----------------------------------------------------------------------------+----------------------------+-------------------------+-------------------------------+---------------------+-------+--------+--------+--------+", + "| true | 01 | 1970-01-02 | 1970-01-01T00:00:00 | 0.0000000001 | 0.0000000000 | 01 | [0, 1, 2] | 1.0 | 1.0 | 1 | 1 | 1 | 1 | {joe: 1} | | 01 | 1 | [0, 1, 2] | 1 | {b: false, c: 42} | 00:00:00.001 | 00:00:01 | 00:00:00.000001 | 00:00:00.000000001 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.001 | 1970-01-01T00:00:00.000000001 | 1970-01-01T00:00:01 | 1 | 1 | 1 | 1 |", + "| false | 0203 | 1969-12-30 | 1970-01-01T00:00:00 | -0.0000000002 | | 02 | | -1.0 | -1.0 | -2 | -2 | -2 | -2 | {blogs: 2, foo: 4} | | 0203 | | | | {b: true, c: 31} | ERROR: Cast error: Failed to convert -2 to temporal for Time32(Millisecond) | ERROR: Cast error: Failed to convert -2 to temporal for Time32(Second) | ERROR: Cast error: Failed to convert -2 to temporal for Time64(Microsecond) | ERROR: Cast error: Failed to convert -2 to temporal for Time64(Nanosecond) | 1969-12-31T23:59:59.999998 | 1969-12-31T23:59:59.998 | 1969-12-31T23:59:59.999999998 | 1969-12-31T23:59:58 | 2 | 2 | 2 | 2 |", + "+---------+--------+------------+---------------------+---------------+--------------+-----------------+---------------+---------+---------+------+-------+-------+-------+--------------------+------+-------------+-------------+-----------+--------+-------------------+-----------------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------------+----------------------------------------------------------------------------+----------------------------+-------------------------+-------------------------------+---------------------+-------+--------+--------+--------+" + ]).await + } + + + #[tokio::test] + async fn test_all_cases() -> Result<()> { + + test_insert_into_append().await?; + test_insert_into_append_by_position().await?; + test_insert_into_append_partitioned_table().await?; + test_insert_into_append_non_partitioned_table_and_read_with_filter().await?; + test_insert_into_append_partitioned_table_and_read_with_partition_filter().await?; + + test_insert_into_fails_when_missing_a_column().await?; + test_insert_into_fails_when_an_extra_column_is_present_but_can_evolve_schema().await?; + + test_datatypes().await?; + + // overwrite case + // todo: insert_overwrite is not supported by datafusion 27.0 + + // test_insert_into_overwrite_non_partitioned_table().await?; + // test_insert_into_overwrite_by_position().await?; + + // todo: + // test_insert_info_schema_enforcement().await?; + // test_insert_info_struct_types_and_schema_enforcement().await?; + Ok(()) + } + + +} diff --git a/rust/lakesoul-datafusion/src/test/mod.rs b/rust/lakesoul-datafusion/src/test/mod.rs index 4ca097d7b..73c130eb6 100644 --- a/rust/lakesoul-datafusion/src/test/mod.rs +++ b/rust/lakesoul-datafusion/src/test/mod.rs @@ -2,4 +2,23 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + +use lakesoul_metadata::MetaDataClient; + +mod insert_tests; mod upsert_tests; +// mod update_tests; + +#[ctor::ctor] +fn init() { + lakesoul_io::tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let client = Arc::new(MetaDataClient::from_env().await.unwrap()); + client.meta_cleanup().await.unwrap(); + println!("clean metadata"); + }) +} \ No newline at end of file diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs index 20a881e16..32bd821b1 100644 --- a/rust/lakesoul-datafusion/src/test/upsert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -810,11 +810,13 @@ mod upsert_with_io_config_tests { } mod upsert_with_metadata_tests { + use std::ops::Deref; use std::sync::Arc; use std::env; use std::path::PathBuf; use std::time::SystemTime; + use lakesoul_io::filter::parser::Parser; use lakesoul_io::{arrow, datafusion, tokio, serde_json}; use lakesoul_io::arrow::array::*; @@ -830,95 +832,21 @@ mod upsert_with_metadata_tests { use datafusion::assert_batches_eq; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion::logical_expr::LogicalPlanBuilder; + use crate::catalog::lakesoul_sink::LakeSoulSinkProvider; + use crate::catalog::lakesoul_source::LakeSoulSourceProvider; use crate::error::Result; use proto::proto::entity::{TableInfo, DataCommitInfo, FileOp, DataFileOp, CommitOp, Uuid}; use tokio::runtime::{Builder, Runtime}; - use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfigBuilder, LakeSoulIOConfig}; + use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfigBuilder, LakeSoulIOConfig, create_session_context}; use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader}; use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; - use lakesoul_metadata::{Client, PreparedStatementMap, MetaDataClient}; - - - async fn commit_data(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()>{ - let table_name_id = client.get_table_name_id_by_table_name(table_name, "default").await?; - client.commit_data_commit_info(DataCommitInfo { - table_id: table_name_id.table_id, - partition_desc: "-5".to_string(), - file_ops: config.files_slice() - .iter() - .map(|file| DataFileOp { - file_op: FileOp::Add as i32, - path: file.clone(), - ..Default::default() - }) - .collect(), - commit_op: CommitOp::AppendCommit as i32, - timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64, - commit_id: { - let (high, low) = uuid::Uuid::new_v4().as_u64_pair(); - Some(Uuid{high, low}) - }, - ..Default::default() - }).await?; - Ok(()) - } - - async fn create_table(client: &mut MetaDataClient, table_name: &str, config: LakeSoulIOConfig) -> Result<()> { - client.create_table( - TableInfo { - table_id: format!("table_{}", uuid::Uuid::new_v4().to_string()), - table_name: table_name.to_string(), - table_path: [env::temp_dir().to_str().unwrap(), table_name].iter().collect::().to_str().unwrap().to_string(), - table_schema: serde_json::to_string(&config.schema()).unwrap(), - table_namespace: "default".to_string(), - properties: "{}".to_string(), - partitions: ";".to_owned() + config.primary_keys_slice().iter().map(String::as_str).collect::>().join(",").as_str(), - domain: "public".to_string(), - }).await?; - Ok(()) - } + use lakesoul_metadata::{Client, PreparedStatementMap, MetaDataClient, MetaDataClientRef}; - async fn create_io_config_builder(client: &mut MetaDataClient, table_name: &str) -> lakesoul_metadata::error::Result { - let table_info = client.get_table_info_by_table_name(table_name, "default").await?; - let data_files = client.get_data_files_by_table_name(table_name, vec![], "default").await?; - let schema_str = client.get_schema_by_table_name(table_name, "default").await?; - let schema = serde_json::from_str::(schema_str.as_str())?; - - Ok(LakeSoulIOConfigBuilder::new() - .with_files(data_files) - .with_schema(Arc::new(schema)) - .with_primary_keys( - parse_table_info_partitions(table_info.partitions).1 - )) - } + use crate::catalog::{create_io_config_builder, create_table, commit_data, parse_table_info_partitions}; - fn parse_table_info_partitions(partitions: String) -> (Vec, Vec) { - let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap()); - let hash_keys = &hash_keys[1..]; - ( - range_keys.split(',') - .collect::>() - .iter() - .filter_map(|str| if str.is_empty() { - None - } else { - Some(str.to_string()) - }) - .collect::>(), - hash_keys.split(',') - .collect::>() - .iter() - .filter_map(|str| if str.is_empty() { - None - } else { - Some(str.to_string()) - }) - .collect::>() - ) - } fn create_batch_i32(names: Vec<&str>, values: Vec<&[i32]>) -> RecordBatch { let values = values @@ -929,67 +857,96 @@ mod upsert_with_metadata_tests { RecordBatch::try_from_iter_with_nullable(iter).unwrap() } - - async fn execute_upsert(batch: RecordBatch, table_name: &str, client: &mut MetaDataClient) -> Result<()> { - let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); - let builder = create_io_config_builder(client, table_name).await?.with_file(file.clone()).with_schema(batch.schema()); - let config = builder.clone().build(); - - let writer = SyncSendableMutableLakeSoulWriter::try_new(config, Builder::new_current_thread().build().unwrap()).unwrap(); - writer.write_batch(batch)?; - writer.flush_and_close()?; - commit_data(client, table_name, builder.clone().build()).await + async fn execute_upsert(record_batch: RecordBatch, table_name: &str, client: MetaDataClientRef) -> Result<()> { + let builder = create_io_config_builder(client, None).await?; + let sess_ctx = create_session_context(&mut builder.clone().build())?; + + let provider = LakeSoulSinkProvider::new(table_name).await?; + sess_ctx.register_table(table_name, Arc::new(provider)).unwrap(); + + let num_rows = record_batch.num_rows(); + let schema = record_batch.schema(); + let logical_plan = LogicalPlanBuilder::insert_into( + sess_ctx.read_batch(record_batch)?.into_unoptimized_plan(), + table_name.to_string(), + &schema.deref())? + .build()?; + let dataframe = DataFrame::new(sess_ctx.state(), logical_plan); + + let results = dataframe + // .explain(true, false)? + .collect() + .await?; + + // print_batches(&results); + + assert_batches_eq!(&[ + "+-----------+", + "| row_count |", + "+-----------+", + format!("| {:<10}|", num_rows).as_str(), + "+-----------+", + ], &results); + Ok(()) } - - + async fn init_table(batch: RecordBatch, table_name: &str, schema: SchemaRef, pks:Vec, client: MetaDataClientRef) -> Result<()> { + let builder = LakeSoulIOConfigBuilder::new() + .with_schema(schema) + .with_primary_keys(pks); + create_table(client.clone(), table_name, builder.build()).await?; + execute_upsert(batch, table_name, client).await + } - async fn init_table(batch: RecordBatch, table_name: &str, pks:Vec, client: &mut MetaDataClient) -> Result<()> { + async fn init_table_without_schema(batch: RecordBatch, table_name: &str, pks:Vec, client: MetaDataClientRef) -> Result<()> { let schema = batch.schema(); let builder = LakeSoulIOConfigBuilder::new() - .with_schema(schema.clone()) + .with_schema(schema) .with_primary_keys(pks); - create_table(client, table_name, builder.build()).await?; + create_table(client.clone(), table_name, builder.build()).await?; execute_upsert(batch, table_name, client).await } + async fn check_upsert(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, client: MetaDataClientRef, expected: &[&str]) -> Result<()> { + execute_upsert(batch, table_name, client.clone()).await?; + let builder = create_io_config_builder(client, None).await?; + let sess_ctx = create_session_context(&mut builder.clone().build())?; + + let provider = LakeSoulSourceProvider::new(table_name).await?; + let dataframe = sess_ctx.read_table(Arc::new(provider))?; + let schema = SchemaRef::new(dataframe.schema().into()); + let dataframe = if let Some(f) = filters { + dataframe.filter(Parser::parse(f.clone(), schema))? + } else { + dataframe + }; - async fn check_upsert(batch: RecordBatch, table_name: &str, selected_cols: Vec<&str>, filters: Option, client: &mut MetaDataClient, expected: &[&str]) -> Result<()> { - execute_upsert(batch, table_name, client).await?; - let builder = create_io_config_builder(client, table_name).await?; - let builder = builder - .with_schema(SchemaRef::new(Schema::new( - selected_cols.iter().map(|col| Field::new(*col, arrow::datatypes::DataType::Int32, true)).collect::>() - ))); - let builder = if let Some(filters) = filters { - builder.with_filter_str(filters) + let dataframe = if selected_cols.is_empty() { + dataframe } else { - builder + dataframe.select_columns(&selected_cols)? }; - let mut reader = SyncSendableMutableLakeSoulReader::new(LakeSoulReader::new(builder.build()).unwrap(), Builder::new_current_thread().build().unwrap()); - reader.start_blocked()?; - let result = reader.next_rb_blocked(); - match result { - Some(result) => { - assert_batches_eq!(expected, &[result?]); - Ok(()) - }, - None => Ok(()) - } + + let result = dataframe + // .explain(true, false)? + .collect() + .await?; + + // print_batches(&result); + assert_batches_eq!(expected, &result); + Ok(()) } - // #[tokio::test] - async fn test_merge_same_column_i32() -> Result<()>{ - let table_name = "merge-same_column"; - let mut client = MetaDataClient::from_env().await?; - // let mut client = MetaDataClient::from_config("host=127.0.0.1 port=5433 dbname=test_lakesoul_meta user=yugabyte password=yugabyte".to_string()); - client.meta_cleanup().await?; + async fn test_merge_same_column_i32() -> Result<()> { + let table_name = "test_merge_same_column_i32"; + let client = Arc::new(MetaDataClient::from_env().await?); init_table( create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), table_name, + SchemaRef::new(Schema::new(["range", "hash", "value"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), vec!["range".to_string(), "hash".to_string()], - &mut client, + client.clone(), ).await?; check_upsert( @@ -997,7 +954,7 @@ mod upsert_with_metadata_tests { table_name, vec!["range", "hash", "value"], None, - &mut client, + client.clone(), &[ "+----------+------+-------+", "| range | hash | value |", @@ -1012,127 +969,74 @@ mod upsert_with_metadata_tests { ).await } - // #[test] - async fn test_datatypes() -> Result<()>{ - let table_name = "test_datatypes"; - let mut client = MetaDataClient::from_env().await?; - // let mut client = MetaDataClient::from_config("host=127.0.0.1 port=5433 dbname=test_lakesoul_meta user=yugabyte password=yugabyte".to_string()); - client.meta_cleanup().await?; - - let iter = vec![ - ("Boolean", Arc::new(BooleanArray::from(vec![true, false])) as ArrayRef, true), - ("Binary", Arc::new(BinaryArray::from_vec(vec![&[1u8], &[2u8, 3u8]])) as ArrayRef, true), - - ("Date32", Arc::new(Date32Array::from(vec![1, -2])) as ArrayRef, true), - ("Date64", Arc::new(Date64Array::from(vec![1, -2])) as ArrayRef, true), - ("Decimal128", Arc::new(Decimal128Array::from(vec![1, -2])) as ArrayRef, true), - ("Decimal256", Arc::new(Decimal256Array::from(vec![Some(i256::default()), None])) as ArrayRef, true), - - // ParquetError(ArrowError("Converting Duration to parquet not supported")) - // ("DurationMicrosecond", Arc::new(DurationMicrosecondArray::from(vec![1])) as ArrayRef, true), - // ("DurationMillisecond", Arc::new(DurationMillisecondArray::from(vec![1])) as ArrayRef, true), - - // ("Float16", Arc::new(Float16Array::from(vec![1.0])) as ArrayRef, true), - - ("FixedSizeBinary", Arc::new(FixedSizeBinaryArray::from(vec![&[1u8][..], &[2u8][..]])) as ArrayRef, true), - ("FixedSizeList", Arc::new(FixedSizeListArray::from_iter_primitive::(vec![ - Some(vec![Some(0), Some(1), Some(2)]), - None, - // Some(vec![Some(3), None, Some(5)]), - // Some(vec![Some(6), Some(7)]), - ], 3)) as ArrayRef, true), - - ("Float32", Arc::new(Float32Array::from(vec![1.0, -1.0])) as ArrayRef, true), - ("Float64", Arc::new(Float64Array::from(vec![1.0, -1.0])) as ArrayRef, true), - - - ("Int8", Arc::new(Int8Array::from(vec![1i8, -2i8])) as ArrayRef, true), - ("Int8Dictionary", Arc::new(Int8DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ("Int16", Arc::new(Int16Array::from(vec![1i16, -2i16])) as ArrayRef, true), - ("Int16Dictionary", Arc::new(Int16DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ("Int32", Arc::new(Int32Array::from(vec![1i32, -2i32])) as ArrayRef, true), - ("Int32Dictionary", Arc::new(Int32DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ("Int64", Arc::new(Int64Array::from(vec![1i64, -2i64])) as ArrayRef, true), - ("Int64Dictionary", Arc::new(Int64DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - - ("IntervalDayTime", Arc::new(IntervalDayTimeArray::from(vec![1, 2])) as ArrayRef, true), - // ParquetError(NYI("Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented")) - //("IntervalMonthDayNano", Arc::new(IntervalMonthDayNanoArray::from(vec![1])) as ArrayRef, true), - ("IntervalYearMonth", Arc::new(IntervalYearMonthArray::from(vec![1, 2])) as ArrayRef, true), - - ("Map", Arc::new({ - let string_builder = StringBuilder::new(); - let int_builder = Int32Builder::with_capacity(4); - - // Construct `[{"joe": 1}, {"blogs": 2, "foo": 4}]` - let mut builder = MapBuilder::new(None, string_builder, int_builder); - - builder.keys().append_value("joe"); - builder.values().append_value(1); - builder.append(true).unwrap(); - - builder.keys().append_value("blogs"); - builder.values().append_value(2); - builder.keys().append_value("foo"); - builder.values().append_value(4); - builder.append(true).unwrap(); - - builder.finish() - }) as ArrayRef, true), - - ("Null", Arc::new(NullArray::new(2)) as ArrayRef, true), - - ("LargeBinary", Arc::new(LargeBinaryArray::from_vec(vec![&[1u8], &[2u8, 3u8]])) as ArrayRef, true), - ("LargeString", Arc::new(LargeStringArray::from(vec!["1", ""])) as ArrayRef, true), - - ("List", Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(0), Some(1), Some(2)]), - None, - // Some(vec![Some(3), None, Some(5)]), - // Some(vec![Some(6), Some(7)]), - ])) as ArrayRef, true), - - // ParquetError(ArrowError("Converting RunEndEncodedType to parquet not supported")) - // ("Run", Arc::new(RunArray::::from_iter([Some("a"), None])) as ArrayRef, true), - - ("String", Arc::new(StringArray::from(vec!["1", ""])) as ArrayRef, true), - ("Struct", Arc::new(StructArray::from(vec![ - ( - Arc::new(Field::new("b", DataType::Boolean, false)), - Arc::new(BooleanArray::from(vec![false, true])) as ArrayRef, - ), - ( - Arc::new(Field::new("c", DataType::Int32, false)), - Arc::new(Int32Array::from(vec![42, 31])) as ArrayRef, - ), - ])) as ArrayRef, true), - - ("Time32Millisecond", Arc::new(Time32MillisecondArray::from(vec![1i32, -2i32])) as ArrayRef, true), - ("Time32Second", Arc::new(Time32SecondArray::from(vec![1i32, -2i32])) as ArrayRef, true), - ("Time64Microsecond", Arc::new(Time64MicrosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), - ("Time64Nanosecond", Arc::new(Time64NanosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), - ("TimestampMicrosecond", Arc::new(TimestampMicrosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), - ("TimestampMillisecond", Arc::new(TimestampMillisecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), - ("TimestampNanosecond", Arc::new(TimestampNanosecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), - ("TimestampSecond", Arc::new(TimestampSecondArray::from(vec![1i64, -2i64])) as ArrayRef, true), - - ("UInt8", Arc::new(UInt8Array::from(vec![1u8, 2u8])) as ArrayRef, true), - ("UInt8Dictionary", Arc::new(UInt8DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ("UInt16", Arc::new(UInt16Array::from(vec![1u16, 2u16])) as ArrayRef, true), - ("UInt16Dictionary", Arc::new(UInt16DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ("UInt32", Arc::new(UInt32Array::from(vec![1u32, 2u32])) as ArrayRef, true), - ("UInt32Dictionary", Arc::new(UInt32DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ("UInt64", Arc::new(UInt64Array::from(vec![1u64, 2u64])) as ArrayRef, true), - ("UInt64Dictionary", Arc::new(UInt64DictionaryArray::from_iter([Some("a"), None])) as ArrayRef, true), - ]; - let batch = RecordBatch::try_from_iter_with_nullable(iter).unwrap(); + async fn test_merge_different_column_i32() -> Result<()> { + let table_name = "test_merge_different_column_i32"; + let client = Arc::new(MetaDataClient::from_env().await?); + init_table( - batch, - // create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), - table_name, - // vec!["range".to_string(), "hash".to_string()], - vec![], - &mut client, + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + SchemaRef::new(Schema::new(["range", "hash", "value", "name"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), + vec!["range".to_string(), "hash".to_string()], + client.clone(), + ).await?; + + check_upsert( + create_batch_i32(vec!["range", "hash", "name"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value", "name"], + None, + client.clone(), + &[ + "+----------+------+-------+------+", + "| range | hash | value | name |", + "+----------+------+-------+------+", + "| 20201101 | 1 | 1 | 11 |", + "| 20201101 | 2 | 2 | |", + "| 20201101 | 3 | 3 | 33 |", + "| 20201101 | 4 | | 44 |", + "| 20201102 | 4 | 4 | |", + "+----------+------+-------+------+", + ] ).await } + + async fn test_merge_different_columns_and_filter_by_non_selected_columns_i32() -> Result<()> { + let table_name = "test_merge_different_columns_and_filter_by_non_selected_columns_i32"; + let client = Arc::new(MetaDataClient::from_env().await?); + init_table( + create_batch_i32(vec!["range", "hash", "value"], vec![&[20201101, 20201101, 20201101, 20201102], &[1, 2, 3, 4], &[1, 2, 3, 4]]), + table_name, + SchemaRef::new(Schema::new(["range", "hash", "value", "name"].into_iter().map(|name| Field::new(name, DataType::Int32, true)).collect::>())), + vec!["range".to_string(), "hash".to_string()], + client.clone(), + ).await?; + + check_upsert( + create_batch_i32(vec!["range", "hash", "name"], vec![&[20201101, 20201101, 20201101], &[1, 3, 4], &[11, 33, 44]]), + table_name, + vec!["range", "hash", "value"], + Some("and(noteq(name, null), gt(name, 0))".to_string()), + client.clone(), + &[ + "+----------+------+-------+", + "| range | hash | value |", + "+----------+------+-------+", + "| 20201101 | 1 | 1 |", + "| 20201101 | 3 | 3 |", + "| 20201101 | 4 | |", + "+----------+------+-------+", + ] + ).await + + } + + #[tokio::test] + async fn test_all_cases() -> Result<()> { + test_merge_same_column_i32().await?; + test_merge_different_column_i32().await?; + test_merge_different_columns_and_filter_by_non_selected_columns_i32().await?; + Ok(()) + } + } diff --git a/rust/lakesoul-io/src/datasource/mod.rs b/rust/lakesoul-io/src/datasource/mod.rs index eeb3c244b..de47b1339 100644 --- a/rust/lakesoul-io/src/datasource/mod.rs +++ b/rust/lakesoul-io/src/datasource/mod.rs @@ -3,4 +3,3 @@ // SPDX-License-Identifier: Apache-2.0 pub mod parquet_source; -pub mod parquet_sink; diff --git a/rust/lakesoul-io/src/datasource/parquet_sink.rs b/rust/lakesoul-io/src/datasource/parquet_sink.rs deleted file mode 100644 index 45a41bac3..000000000 --- a/rust/lakesoul-io/src/datasource/parquet_sink.rs +++ /dev/null @@ -1,192 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - - -use std::any::Any; -use std::fmt::{self, Debug}; -use std::sync::Arc; - -use async_trait::async_trait; - -use datafusion::common::{Statistics, DataFusionError}; - -use datafusion::datasource::TableProvider; -use datafusion::error::Result; -use datafusion::execution::context::{SessionState, TaskContext}; -use datafusion::logical_expr::{ - Expr, TableType, -}; -use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Distribution, stream::RecordBatchStreamAdapter -}; -use datafusion::arrow::datatypes::{Schema, SchemaRef}; - -use crate::lakesoul_io_config::{IOSchema, LakeSoulIOConfig}; -use crate::lakesoul_writer::MultiPartAsyncWriter; -use crate::transform::uniform_schema; - -#[derive(Debug, Clone)] -pub struct LakeSoulParquetSinkProvider{ - schema: SchemaRef, - config: LakeSoulIOConfig -} - -#[async_trait] -impl TableProvider for LakeSoulParquetSinkProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - _state: &SessionState, - _projections: Option<&Vec>, - // filters and limit can be used here to inject some push-down operations if needed - _filters: &[Expr], - _limit: Option, - ) -> Result> { - let msg = "Scan not implemented for LakeSoulParquetSinkProvider".to_owned(); - Err(DataFusionError::NotImplemented(msg)) - } - - - async fn insert_into( - &self, - _state: &SessionState, - input: Arc, - ) -> Result> { - let writer_schema = self.schema(); - let mut writer_config = self.config.clone(); - writer_config.schema = IOSchema(uniform_schema(writer_schema)); - let _writer = MultiPartAsyncWriter::try_new(writer_config).await?; - Ok(Arc::new(LakeSoulParquetSinkExec::new(input))) - } - -} - -#[derive(Debug, Clone)] -struct LakeSoulParquetSinkExec { - /// Input plan that produces the record batches to be written. - input: Arc, - /// Sink to whic to write - // sink: Arc, - /// Schema describing the structure of the data. - schema: SchemaRef, - -} - -impl LakeSoulParquetSinkExec { - fn new( - input: Arc, - ) -> Self { - Self { - input, - schema: Arc::new(Schema::empty()) - } - } -} - -impl DisplayAs for LakeSoulParquetSinkExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { - write!(f, "LakeSoulParquetSinkExec") - } -} - -impl ExecutionPlan for LakeSoulParquetSinkExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - /// Get the schema for this execution plan - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { - datafusion::physical_plan::Partitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn required_input_distribution(&self) -> Vec { - vec![Distribution::SinglePartition] - } - - fn required_input_ordering(&self) -> Vec>> { - // Require that the InsertExec gets the data in the order the - // input produced it (otherwise the optimizer may chose to reorder - // the input which could result in unintended / poor UX) - // - // More rationale: - // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 - vec![self - .input - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs)] - } - - fn maintains_input_order(&self) -> Vec { - vec![false] - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - Ok(Arc::new(Self { - input: children[0].clone(), - schema: self.schema.clone(), - })) - } - - /// Execute the plan and return a stream of `RecordBatch`es for - /// the specified partition. - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - if partition != 0 { - return Err(DataFusionError::Internal( - format!("Invalid requested partition {partition}. InsertExec requires a single input partition." - ))); - } - - // Execute each of our own input's partitions and pass them to the sink - let input_partition_count = self.input.output_partitioning().partition_count(); - if input_partition_count != 1 { - return Err(DataFusionError::Internal(format!( - "Invalid input partition count {input_partition_count}. \ - InsertExec needs only a single partition." - ))); - } - - let data = self.input.execute(0, context)?; - let schema = self.schema.clone(); - - - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, data))) - } - - - fn statistics(&self) -> Statistics { - Statistics::default() - } -} diff --git a/rust/lakesoul-io/src/datasource/parquet_source.rs b/rust/lakesoul-io/src/datasource/parquet_source.rs index e47a6de83..00921de1f 100644 --- a/rust/lakesoul-io/src/datasource/parquet_source.rs +++ b/rust/lakesoul-io/src/datasource/parquet_source.rs @@ -136,7 +136,7 @@ impl LakeSoulParquetProvider { } } - pub(crate) async fn build_with_context(&self, context: &SessionContext) -> Result { + pub async fn build_with_context(&self, context: &SessionContext) -> Result { let mut plans = vec![]; let mut full_schema = uniform_schema(self.config.schema.0.clone()).to_dfschema().unwrap(); for i in 0..self.config.files.len() { diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index a029d22be..08f2c2e99 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -189,6 +189,20 @@ impl LakeSoulIOConfigBuilder { pub fn build(self) -> LakeSoulIOConfig { self.config } + + pub fn schema(&self) -> SchemaRef { + self.config.schema() + } + + pub fn primary_keys_slice(&self) -> &[String] { + self.config.primary_keys_slice() + } +} + +impl From for LakeSoulIOConfigBuilder { + fn from(val: LakeSoulIOConfig) -> Self { + LakeSoulIOConfigBuilder { config: val } + } } /// First check envs for credentials, region and endpoint. diff --git a/rust/lakesoul-metadata/src/error.rs b/rust/lakesoul-metadata/src/error.rs index cbd6721fd..74b0dfeae 100644 --- a/rust/lakesoul-metadata/src/error.rs +++ b/rust/lakesoul-metadata/src/error.rs @@ -24,6 +24,7 @@ pub enum LakeSoulMetaDataError { ProstDecodeError(prost::DecodeError), ProstEncodeError(prost::EncodeError), Other(GenericError), + Internal(String), } impl From for LakeSoulMetaDataError { @@ -92,6 +93,11 @@ impl Display for LakeSoulMetaDataError { LakeSoulMetaDataError::Other(ref desc) => { write!(f, "Other error: {desc}") } + LakeSoulMetaDataError::Internal(ref desc) => { + write!(f, "Internal error: {desc}.\nThis was likely caused by a bug in LakeSoul's \ + code and we would welcome that you file an bug report in our issue tracker") + } + } } } @@ -107,6 +113,7 @@ impl Error for LakeSoulMetaDataError { LakeSoulMetaDataError::ProstDecodeError(e) => Some(e), LakeSoulMetaDataError::ProstEncodeError(e) => Some(e), LakeSoulMetaDataError::Other(e) => Some(e.as_ref()), + LakeSoulMetaDataError::Internal(_) => None, } } } diff --git a/rust/lakesoul-metadata/src/lib.rs b/rust/lakesoul-metadata/src/lib.rs index e093d8fde..25f7c0153 100644 --- a/rust/lakesoul-metadata/src/lib.rs +++ b/rust/lakesoul-metadata/src/lib.rs @@ -20,7 +20,7 @@ use tokio::spawn; pub use tokio_postgres::{NoTls, Client, Statement}; use postgres_types::{ToSql, FromSql}; -pub use metadata_client::MetaDataClient; +pub use metadata_client::{MetaDataClient, MetaDataClientRef}; pub const DAO_TYPE_QUERY_ONE_OFFSET : i32 = 0; pub const DAO_TYPE_QUERY_LIST_OFFSET : i32 = 100; diff --git a/rust/lakesoul-metadata/src/metadata_client.rs b/rust/lakesoul-metadata/src/metadata_client.rs index 6b47c0a92..daafee57c 100644 --- a/rust/lakesoul-metadata/src/metadata_client.rs +++ b/rust/lakesoul-metadata/src/metadata_client.rs @@ -2,10 +2,13 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::ops::DerefMut; +use std::sync::Arc; use std::{collections::HashMap, vec, env, fs}; use proto::proto::entity::{TablePathId, TableNameId, TableInfo, PartitionInfo, JniWrapper, DataCommitInfo, MetaInfo, CommitOp, self}; use prost::Message; +use tokio::sync::Mutex; use tokio_postgres::Client; use url::Url; @@ -14,10 +17,13 @@ use crate::{execute_insert, PreparedStatementMap, DaoType, create_connection, cl use crate::error::Result; pub struct MetaDataClient { - client: Client, - prepared: PreparedStatementMap, + client: Arc>, + prepared: Arc>, + max_retry: usize, } +pub type MetaDataClientRef = Arc; + impl MetaDataClient { pub async fn from_env() -> Result { match env::var("lakesoul_home") { @@ -45,16 +51,21 @@ impl MetaDataClient { } pub async fn from_config(config: String) -> Result { - let client = create_connection(config).await?; - let prepared = PreparedStatementMap::new(); + Self::from_config_and_max_retry(config, 3).await + } + + pub async fn from_config_and_max_retry(config: String, max_retry: usize) -> Result { + let client = Arc::new(Mutex::new(create_connection(config).await?)); + let prepared = Arc::new(Mutex::new(PreparedStatementMap::new())); Ok(Self { client, - prepared + prepared, + max_retry }) } pub async fn create_table( - &mut self, + &self, table_info: TableInfo ) -> Result<()> { self.insert_table_path_id(&table_path_id_from_table_info(&table_info)).await?; @@ -63,40 +74,57 @@ impl MetaDataClient { Ok(()) } - async fn execute_insert(&mut self, insert_type: i32, wrapper: JniWrapper) -> Result { - execute_insert(&mut self.client, &mut self.prepared, insert_type, wrapper).await + async fn execute_insert(&self, insert_type: i32, wrapper: JniWrapper) -> Result { + for times in 0..self.max_retry { + match execute_insert(self.client.lock().await.deref_mut(), self.prepared.lock().await.deref_mut(), insert_type, wrapper.clone()).await { + Ok(count) => return Ok(count), + Err(_) if times < self.max_retry => { + continue + } + Err(e) => return Err(e) + }; + } + Ok(0) } - async fn execute_query(&mut self, query_type: i32, joined_string: String) -> Result { - let encoded = execute_query( &self.client, &mut self.prepared, query_type, joined_string).await?; - Ok(JniWrapper::decode(prost::bytes::Bytes::from(encoded))?) + async fn execute_query(&self, query_type: i32, joined_string: String) -> Result { + for times in 0..self.max_retry { + match execute_query( self.client.lock().await.deref_mut(), self.prepared.lock().await.deref_mut(), query_type, joined_string.clone()).await { + Ok(encoded) => return Ok(JniWrapper::decode(prost::bytes::Bytes::from(encoded))?), + Err(_) if times < self.max_retry => { + continue + } + Err(e) => return Err(e) + }; + } + Ok(Default::default()) } - async fn insert_table_info(&mut self, table_info: &TableInfo) -> Result { + async fn insert_table_info(&self, table_info: &TableInfo) -> Result { self.execute_insert(DaoType::InsertTableInfo as i32, JniWrapper{table_info: vec![table_info.clone()], ..Default::default()}).await } - async fn insert_table_name_id(&mut self, table_name_id: &TableNameId) -> Result{ + async fn insert_table_name_id(&self, table_name_id: &TableNameId) -> Result{ self.execute_insert(DaoType::InsertTableNameId as i32, JniWrapper{table_name_id: vec![table_name_id.clone()], ..Default::default()}).await } - async fn insert_table_path_id(&mut self, table_path_id: &TablePathId) -> Result{ + async fn insert_table_path_id(&self, table_path_id: &TablePathId) -> Result{ self.execute_insert(DaoType::InsertTablePathId as i32, JniWrapper{table_path_id: vec![table_path_id.clone()], ..Default::default()}).await } - async fn insert_data_commit_info(&mut self, data_commit_info: &DataCommitInfo) -> Result { + async fn insert_data_commit_info(&self, data_commit_info: &DataCommitInfo) -> Result { self.execute_insert(DaoType::InsertDataCommitInfo as i32, JniWrapper{data_commit_info: vec![data_commit_info.clone()], ..Default::default()}).await } - async fn transaction_insert_partition_info(&mut self, partition_info_list: Vec) -> Result { + async fn transaction_insert_partition_info(&self, partition_info_list: Vec) -> Result { self.execute_insert(DaoType::TransactionInsertPartitionInfo as i32, JniWrapper { partition_info: partition_info_list, ..Default::default()}).await } - pub async fn meta_cleanup(&mut self) -> Result { - clean_meta_for_test(&self.client).await + pub async fn meta_cleanup(&self) -> Result { + clean_meta_for_test(self.client.lock().await.deref_mut()).await } - pub async fn commit_data(&mut self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> { + pub async fn commit_data(&self, meta_info: MetaInfo, commit_op: CommitOp) -> Result<()> { let table_info = meta_info.table_info.unwrap(); if !table_info.table_name.is_empty() { // todo: updateTableShortName @@ -163,7 +191,7 @@ impl MetaDataClient { } } - async fn get_cur_partition_map(&mut self, table_id: &str, partition_desc_list: &[String]) -> Result> { + async fn get_cur_partition_map(&self, table_id: &str, partition_desc_list: &[String]) -> Result> { Ok(self.get_partition_info_by_table_id_and_partition_list(table_id, partition_desc_list).await? .iter() .map(|partition_info|(partition_info.partition_desc.clone(), partition_info.clone())) @@ -171,7 +199,7 @@ impl MetaDataClient { ) } - pub async fn commit_data_commit_info(&mut self, data_commit_info: DataCommitInfo) -> Result<()> { + pub async fn commit_data_commit_info(&self, data_commit_info: DataCommitInfo) -> Result<()> { let table_id = &data_commit_info.table_id; let partition_desc = &data_commit_info.partition_desc; let commit_op = data_commit_info.commit_op; @@ -202,25 +230,27 @@ impl MetaDataClient { }, CommitOp::from_i32(commit_op).unwrap()).await } - pub fn get_table_domain(&mut self, _table_id: &str) -> Result { + pub fn get_table_domain(&self, _table_id: &str) -> Result { + // todo: get property table_domain Ok("public".to_string()) } - pub async fn get_table_name_id_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + pub async fn get_table_name_id_by_table_name(&self, table_name: &str, namespace: &str) -> Result { match self.execute_query(DaoType::SelectTableNameIdByTableName as i32, [table_name, namespace].join(PARAM_DELIM)).await { Ok(wrapper) => Ok(wrapper.table_name_id[0].clone()), Err(err) => Err(err) } } - pub async fn get_table_info_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + pub async fn get_table_info_by_table_name(&self, table_name: &str, namespace: &str) -> Result { match self.execute_query(DaoType::SelectTableInfoByTableNameAndNameSpace as i32, [table_name, namespace].join(PARAM_DELIM)).await { + Ok(wrapper) if wrapper.table_info.is_empty() => Err(crate::error::LakeSoulMetaDataError::Internal(format!("Table '{}' not found", table_name))), Ok(wrapper) => Ok(wrapper.table_info[0].clone()), Err(err) => Err(err) } } - pub async fn get_table_info_by_table_id(&mut self, table_id: &str) -> Result { + pub async fn get_table_info_by_table_id(&self, table_id: &str) -> Result { match self.execute_query(DaoType::SelectTableInfoByTableId as i32, table_id.to_string()).await { Ok(wrapper) => Ok(wrapper.table_info[0].clone()), Err(err) => Err(err) @@ -228,7 +258,7 @@ impl MetaDataClient { } - pub async fn get_data_files_by_table_name(&mut self, table_name: &str, partitions: Vec<(&str, &str)>, namespace: &str) -> Result> { + pub async fn get_data_files_by_table_name(&self, table_name: &str, partitions: Vec<(&str, &str)>, namespace: &str) -> Result> { let partition_filter = partitions .iter() .map(|(k, v)| format!("{}={}", k, v)) @@ -259,7 +289,7 @@ impl MetaDataClient { Ok(data_commit_info_list) } - async fn get_data_commit_info_of_single_partition(&mut self, partition_info: &PartitionInfo) -> Result> { + async fn get_data_commit_info_of_single_partition(&self, partition_info: &PartitionInfo) -> Result> { let table_id = &partition_info.table_id; let partition_desc = &partition_info.partition_desc; let joined_commit_id = &partition_info.snapshot @@ -274,19 +304,19 @@ impl MetaDataClient { } } - pub async fn get_schema_by_table_name(&mut self, table_name: &str, namespace: &str) -> Result { + pub async fn get_schema_by_table_name(&self, table_name: &str, namespace: &str) -> Result { let table_info = self.get_table_info_by_table_name(table_name, namespace).await?; Ok(table_info.table_schema) } - pub async fn get_all_partition_info(&mut self, table_id: &str) -> Result> { + pub async fn get_all_partition_info(&self, table_id: &str) -> Result> { match self.execute_query(DaoType::ListPartitionByTableId as i32, table_id.to_string()).await { Ok(wrapper) => Ok(wrapper.partition_info), Err(e) => Err(e), } } - pub async fn get_single_data_commit_info(&mut self, table_id: &str, partition_desc: &str, commit_id: &str) -> Result> { + pub async fn get_single_data_commit_info(&self, table_id: &str, partition_desc: &str, commit_id: &str) -> Result> { match self.execute_query(DaoType::SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId as i32, [table_id, partition_desc, commit_id].join(PARAM_DELIM)).await { Ok(wrapper) => Ok(if wrapper.data_commit_info.is_empty() { None @@ -297,7 +327,7 @@ impl MetaDataClient { } } - pub async fn get_partition_info_by_table_id_and_partition_list(&mut self, table_id: &str, partition_desc_list: &[String]) -> Result> { + pub async fn get_partition_info_by_table_id_and_partition_list(&self, table_id: &str, partition_desc_list: &[String]) -> Result> { match self.execute_query(DaoType::ListPartitionDescByTableIdAndParList as i32, [table_id, partition_desc_list.join(PARTITION_DESC_DELIM).as_str()].join(PARAM_DELIM)).await { Ok(wrapper) => Ok(wrapper.partition_info), Err(e) => Err(e),