From 2594091fa9d5a0acf45a2b574a77fe93eae49237 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 10 Feb 2025 11:44:58 +0000 Subject: [PATCH 1/4] feat(metric-engine): introduce batch alter request handling --- src/metric-engine/src/engine.rs | 19 +- src/metric-engine/src/engine/alter.rs | 211 ++++++----- .../src/engine/alter/extract_new_columns.rs | 51 +++ .../src/engine/alter/validate.rs | 33 ++ src/metric-engine/src/engine/create.rs | 103 +----- .../src/engine/create/validate.rs | 5 +- src/metric-engine/src/engine/state.rs | 13 +- src/metric-engine/src/error.rs | 17 +- src/metric-engine/src/metadata_region.rs | 342 ++++++++---------- src/store-api/src/region_request.rs | 4 +- 10 files changed, 391 insertions(+), 407 deletions(-) create mode 100644 src/metric-engine/src/engine/alter/extract_new_columns.rs create mode 100644 src/metric-engine/src/engine/alter/validate.rs diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 8e69458c12fd..95261580bdff 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -146,12 +146,17 @@ impl RegionEngine for MetricEngine { }) } BatchRegionDdlRequest::Alter(requests) => { - self.handle_requests( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), - ) - .await + let mut extension_return_value = HashMap::new(); + let rows = self + .inner + .alter_regions(requests, &mut extension_return_value) + .await + .map_err(BoxedError::new)?; + + Ok(RegionResponse { + affected_rows: rows, + extensions: extension_return_value, + }) } BatchRegionDdlRequest::Drop(requests) => { self.handle_requests( @@ -184,7 +189,7 @@ impl RegionEngine for MetricEngine { RegionRequest::Close(close) => self.inner.close_region(region_id, close).await, RegionRequest::Alter(alter) => { self.inner - .alter_region(region_id, alter, &mut extension_return_value) + .alter_regions(vec![(region_id, alter)], &mut extension_return_value) .await } RegionRequest::Compact(_) => { diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 5fd0c13e78a7..8822572d2a3a 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -12,148 +12,167 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +mod extract_new_columns; +mod validate; + +use std::collections::{HashMap, HashSet}; use common_telemetry::error; -use snafu::{OptionExt, ResultExt}; +use extract_new_columns::extract_new_columns; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::storage::RegionId; +use validate::validate_alter_region_requests; +use crate::engine::create::add_columns_to_physical_data_region; use crate::engine::MetricEngineInner; use crate::error::{ LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, + UnexpectedRequestSnafu, }; -use crate::utils::{to_data_region_id, to_metadata_region_id}; +use crate::utils::to_data_region_id; impl MetricEngineInner { - /// Dispatch region alter request - pub async fn alter_region( + pub async fn alter_regions( &self, - region_id: RegionId, - request: RegionAlterRequest, + mut requests: Vec<(RegionId, RegionAlterRequest)>, extension_return_value: &mut HashMap>, ) -> Result { - let is_altering_physical_region = self.is_physical_region(region_id); + if requests.is_empty() { + return Ok(0); + } - let result = if is_altering_physical_region { - self.alter_physical_region(region_id, request).await + let first_region_id = &requests.first().unwrap().0; + if self.is_physical_region(*first_region_id) { + ensure!( + requests.len() == 1, + UnexpectedRequestSnafu { + reason: "Physical table must be altered with single request".to_string(), + } + ); + let (region_id, request) = requests.pop().unwrap(); + self.alter_physical_region(region_id, request).await?; } else { - let physical_region_id = self.alter_logical_region(region_id, request).await?; - - // Add physical table's column to extension map. - // It's ok to overwrite existing key, as the latter come schema is more up-to-date - let physical_columns = self - .data_region - .physical_columns(physical_region_id) + self.alter_logical_regions(requests, extension_return_value) .await?; - extension_return_value.insert( - ALTER_PHYSICAL_EXTENSION_KEY.to_string(), - ColumnMetadata::encode_list(&physical_columns) - .context(SerializeColumnMetadataSnafu)?, - ); - - Ok(()) - }; - - result.map(|_| 0) + } + Ok(0) } - /// Return the physical region id behind this logical region - async fn alter_logical_region( + /// Alter multiple logical regions on the same physical region. + pub async fn alter_logical_regions( &self, - logical_region_id: RegionId, - request: RegionAlterRequest, - ) -> Result { + requests: Vec<(RegionId, RegionAlterRequest)>, + extension_return_value: &mut HashMap>, + ) -> Result { + validate_alter_region_requests(&requests)?; + + let first_logical_region_id = requests[0].0; + + // Finds new columns to add + let mut new_column_names = HashSet::new(); + let mut new_columns_to_add = vec![]; + let (physical_region_id, index_options) = { let state = &self.state.read().unwrap(); let physical_region_id = state - .get_physical_region_id(logical_region_id) + .get_physical_region_id(first_logical_region_id) .with_context(|| { - error!("Trying to alter an nonexistent region {logical_region_id}"); + error!("Trying to alter an nonexistent region {first_logical_region_id}"); LogicalRegionNotFoundSnafu { - region_id: logical_region_id, + region_id: first_logical_region_id, } })?; - - let index_options = state + let region_state = state .physical_region_states() .get(&physical_region_id) .with_context(|| PhysicalRegionNotFoundSnafu { region_id: physical_region_id, - })? - .options() - .index; + })?; + let physical_columns = region_state.physical_columns(); - (physical_region_id, index_options) - }; + extract_new_columns( + &requests, + physical_columns, + &mut new_column_names, + &mut new_columns_to_add, + )?; - // only handle adding column - let AlterKind::AddColumns { columns } = request.kind else { - return Ok(physical_region_id); + (physical_region_id, region_state.options().index) }; + let data_region_id = to_data_region_id(physical_region_id); - // lock metadata region for this logical region id - let _write_guard = self - .metadata_region - .write_lock_logical_region(logical_region_id) - .await; - - let metadata_region_id = to_metadata_region_id(physical_region_id); - let mut columns_to_add = vec![]; - // columns that already exist in physical region - let mut existing_columns = vec![]; - - let pre_existing_physical_columns = self - .data_region - .physical_columns(physical_region_id) - .await?; - - let pre_exist_cols = pre_existing_physical_columns - .iter() - .map(|col| (col.column_schema.name.as_str(), col)) - .collect::>(); - - // check pre-existing physical columns so if any columns to add is already exist, - // we can skip it in physical alter operation - // (but still need to update them in logical alter operation) - for col in &columns { - if let Some(exist_column) = - pre_exist_cols.get(&col.column_metadata.column_schema.name.as_str()) - { - // push the correct column schema with correct column id - existing_columns.push(*exist_column); - } else { - columns_to_add.push(col.column_metadata.clone()); + let mut write_guards = HashMap::with_capacity(requests.len()); + for (region_id, _) in requests.iter() { + if write_guards.contains_key(region_id) { + continue; } + let _write_guard = self + .metadata_region + .write_lock_logical_region(*region_id) + .await; + write_guards.insert(*region_id, _write_guard); } - // alter data region - let data_region_id = to_data_region_id(physical_region_id); - self.add_columns_to_physical_data_region( + add_columns_to_physical_data_region( data_region_id, - logical_region_id, - &mut columns_to_add, index_options, + &mut new_columns_to_add, + &self.data_region, ) .await?; - // note here we don't use `columns` directly but concat `existing_columns` with `columns_to_add` to get correct metadata - // about already existing columns - for metadata in existing_columns.into_iter().chain(columns_to_add.iter()) { - self.metadata_region - .add_column(metadata_region_id, logical_region_id, metadata) - .await?; - } + let physical_columns = self.data_region.physical_columns(data_region_id).await?; + let physical_schema_map = physical_columns + .iter() + .map(|metadata| (metadata.column_schema.name.as_str(), metadata)) + .collect::>(); - // invalid logical column cache - self.state - .write() - .unwrap() - .invalid_logical_column_cache(logical_region_id); + let logical_region_columns = requests.iter().map(|(region_id, request)| { + let AlterKind::AddColumns { columns } = &request.kind else { + unreachable!() + }; + ( + *region_id, + columns + .iter() + .map(|col| { + let column_name = col.column_metadata.column_schema.name.as_str(); + let column_metadata = *physical_schema_map.get(column_name).unwrap(); + (column_name, column_metadata) + }) + .collect::>(), + ) + }); + + let new_add_columns = new_columns_to_add.iter().map(|metadata| { + // Safety: previous steps ensure the physical region exist + let column_metadata = *physical_schema_map + .get(metadata.column_schema.name.as_str()) + .unwrap(); + ( + metadata.column_schema.name.to_string(), + column_metadata.column_id, + ) + }); + + // Writes logical regions metadata to metadata region + self.metadata_region + .add_logical_regions(physical_region_id, false, logical_region_columns) + .await?; + + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?, + ); + + let mut state = self.state.write().unwrap(); + state.add_physical_columns(data_region_id, new_add_columns); + state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id)); - Ok(physical_region_id) + Ok(0) } async fn alter_physical_region( @@ -241,7 +260,7 @@ mod test { let region_id = env.default_logical_region_id(); engine_inner - .alter_logical_region(region_id, request) + .alter_logical_regions(vec![(region_id, request)], &mut HashMap::new()) .await .unwrap(); let semantic_type = metadata_region diff --git a/src/metric-engine/src/engine/alter/extract_new_columns.rs b/src/metric-engine/src/engine/alter/extract_new_columns.rs new file mode 100644 index 000000000000..fdb1ef6126e4 --- /dev/null +++ b/src/metric-engine/src/engine/alter/extract_new_columns.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use store_api::metadata::ColumnMetadata; +use store_api::region_request::{AlterKind, RegionAlterRequest}; +use store_api::storage::{ColumnId, RegionId}; + +use crate::error::Result; + +/// Extract new columns from the create requests. +/// +/// # Panics +/// +/// This function will panic if the alter kind is not `AddColumns`. +pub fn extract_new_columns<'a>( + requests: &'a [(RegionId, RegionAlterRequest)], + physical_columns: &HashMap, + new_column_names: &mut HashSet<&'a str>, + new_columns: &mut Vec, +) -> Result<()> { + for (_, request) in requests { + let AlterKind::AddColumns { columns } = &request.kind else { + unreachable!() + }; + for col in columns { + let column_name = col.column_metadata.column_schema.name.as_str(); + if !physical_columns.contains_key(column_name) + && !new_column_names.contains(column_name) + { + new_column_names.insert(column_name); + // TODO(weny): avoid clone + new_columns.push(col.column_metadata.clone()); + } + } + } + + Ok(()) +} diff --git a/src/metric-engine/src/engine/alter/validate.rs b/src/metric-engine/src/engine/alter/validate.rs new file mode 100644 index 000000000000..2e2d91eccfc6 --- /dev/null +++ b/src/metric-engine/src/engine/alter/validate.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; +use store_api::region_request::{AlterKind, RegionAlterRequest}; +use store_api::storage::RegionId; + +use crate::error::{Result, UnsupportedAlterKindSnafu}; + +/// Validate the alter region requests. +pub fn validate_alter_region_requests(requests: &[(RegionId, RegionAlterRequest)]) -> Result<()> { + for (_, request) in requests { + ensure!( + matches!(request.kind, AlterKind::AddColumns { .. }), + UnsupportedAlterKindSnafu { + kind: request.kind.as_ref() + } + ); + } + + Ok(()) +} diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index b76f22586eac..186fe9288c66 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -18,9 +18,9 @@ mod validate; use std::collections::{HashMap, HashSet}; -use add_columns::add_columns_to_physical_data_region; +pub(crate) use add_columns::add_columns_to_physical_data_region; use api::v1::SemanticType; -use common_telemetry::{info, warn}; +use common_telemetry::info; use common_time::{Timestamp, FOREVER}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -46,15 +46,15 @@ use store_api::storage::RegionId; use validate::validate_create_logical_regions; use crate::engine::create::extract_new_columns::extract_new_columns; -use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions}; +use crate::engine::options::{set_data_region_options, PhysicalRegionOptions}; use crate::engine::MetricEngineInner; use crate::error::{ - ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu, - EmptyRequestSnafu, InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu, + ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu, + InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu, MultipleFieldColumnSnafu, NoFieldColumnSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu, }; -use crate::metrics::{PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; +use crate::metrics::PHYSICAL_REGION_COUNT; use crate::utils::{self, to_data_region_id, to_metadata_region_id}; impl MetricEngineInner { @@ -87,16 +87,8 @@ impl MetricEngineInner { .options .contains_key(LOGICAL_TABLE_METADATA_KEY) { - let physical_region_id = self.create_logical_regions(requests).await?; - let physical_columns = self - .data_region - .physical_columns(physical_region_id) + self.create_logical_regions(requests, extension_return_value) .await?; - extension_return_value.insert( - ALTER_PHYSICAL_EXTENSION_KEY.to_string(), - ColumnMetadata::encode_list(&physical_columns) - .context(SerializeColumnMetadataSnafu)?, - ); } else { return MissingRegionOptionSnafu {}.fail(); } @@ -162,14 +154,11 @@ impl MetricEngineInner { } /// Create multiple logical regions on the same physical region. - /// - /// Returns the physical region id of the created logical regions. async fn create_logical_regions( &self, requests: Vec<(RegionId, RegionCreateRequest)>, - ) -> Result { - ensure!(!requests.is_empty(), EmptyRequestSnafu {}); - + extension_return_value: &mut HashMap>, + ) -> Result<()> { let physical_region_id = validate_create_logical_regions(&requests)?; let data_region_id = utils::to_data_region_id(physical_region_id); @@ -225,7 +214,7 @@ impl MetricEngineInner { .iter() .map(|(region_id, _)| (*region_id)) .collect::>(); - let logical_regions_column_names = requests.iter().map(|(region_id, request)| { + let logical_region_columns = requests.iter().map(|(region_id, request)| { ( *region_id, request @@ -253,80 +242,20 @@ impl MetricEngineInner { ) }); + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?, + ); + // Writes logical regions metadata to metadata region self.metadata_region - .add_logical_regions(physical_region_id, logical_regions_column_names) + .add_logical_regions(physical_region_id, true, logical_region_columns) .await?; let mut state = self.state.write().unwrap(); state.add_physical_columns(data_region_id, new_add_columns); state.add_logical_regions(physical_region_id, logical_regions); - Ok(data_region_id) - } - - /// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id - /// which should be correct if the following requirements are met: - /// - /// # NOTE - /// - /// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns. - /// - pub(crate) async fn add_columns_to_physical_data_region( - &self, - data_region_id: RegionId, - logical_region_id: RegionId, - new_columns: &mut [ColumnMetadata], - index_options: IndexOptions, - ) -> Result<()> { - // Return early if no new columns are added. - if new_columns.is_empty() { - return Ok(()); - } - - // alter data region - self.data_region - .add_columns(data_region_id, new_columns, index_options) - .await?; - - // correct the column id - let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?; - let after_alter_physical_schema_map = after_alter_physical_schema - .iter() - .map(|metadata| (metadata.column_schema.name.as_str(), metadata)) - .collect::>(); - - // double check to make sure column ids are not mismatched - // shouldn't be a expensive operation, given it only query for physical columns - for col in new_columns.iter_mut() { - let column_metadata = after_alter_physical_schema_map - .get(&col.column_schema.name.as_str()) - .with_context(|| ColumnNotFoundSnafu { - name: &col.column_schema.name, - region_id: data_region_id, - })?; - if col != *column_metadata { - warn!( - "Add already existing columns with different column metadata to physical region({:?}): new column={:?}, old column={:?}", - data_region_id, - col, - column_metadata - ); - // update to correct metadata - *col = (*column_metadata).clone(); - } - } - - // safety: previous step has checked this - self.state.write().unwrap().add_physical_columns( - data_region_id, - new_columns - .iter() - .map(|meta| (meta.column_schema.name.clone(), meta.column_id)), - ); - info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}"); - PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); - Ok(()) } diff --git a/src/metric-engine/src/engine/create/validate.rs b/src/metric-engine/src/engine/create/validate.rs index df98294480fc..943e42af52c4 100644 --- a/src/metric-engine/src/engine/create/validate.rs +++ b/src/metric-engine/src/engine/create/validate.rs @@ -18,8 +18,7 @@ use store_api::region_request::RegionCreateRequest; use store_api::storage::RegionId; use crate::error::{ - ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, - Result, + ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result, }; /// Validate the create logical regions request. @@ -28,8 +27,6 @@ use crate::error::{ pub fn validate_create_logical_regions( requests: &[(RegionId, RegionCreateRequest)], ) -> Result { - ensure!(!requests.is_empty(), EmptyRequestSnafu {}); - let (_, request) = requests.first().unwrap(); let first_physical_region_id_raw = request .options diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 3c273372fe0b..42975e83e643 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -139,6 +139,15 @@ impl MetricEngineState { } } + pub fn invalid_logical_regions_cache( + &mut self, + logical_region_ids: impl IntoIterator, + ) { + for logical_region_id in logical_region_ids { + self.logical_columns.remove(&logical_region_id); + } + } + /// # Panic /// if the physical region does not exist pub fn add_logical_region( @@ -233,10 +242,6 @@ impl MetricEngineState { Ok(()) } - pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) { - self.logical_columns.remove(&logical_region_id); - } - pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool { self.logical_regions().contains_key(&logical_region_id) } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 4e082da305f0..de0d935ee081 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -218,6 +218,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Unsupported alter kind: {}", kind))] + UnsupportedAlterKind { + kind: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Multiple field column found: {} and {}", previous, current))] MultipleFieldColumn { previous: String, @@ -246,12 +253,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Empty request"))] - EmptyRequest { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Unexpected request: {}", reason))] UnexpectedRequest { reason: String, @@ -276,8 +277,8 @@ impl ErrorExt for Error { | NoFieldColumn { .. } | AddingFieldColumn { .. } | ParseRegionOptions { .. } - | EmptyRequest { .. } - | UnexpectedRequest { .. } => StatusCode::InvalidArguments, + | UnexpectedRequest { .. } + | UnsupportedAlterKind { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { StatusCode::Unsupported diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 71a3a0e3ce8d..55b933b2cf98 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -20,7 +20,6 @@ use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use base64::engine::general_purpose::STANDARD_NO_PAD; use base64::Engine; use common_recordbatch::util::collect; -use datafusion::prelude::{col, lit}; use mito2::engine::MitoEngine; use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; @@ -80,28 +79,6 @@ impl MetadataRegion { .insert(logical_region_id, Arc::new(RwLock::new(()))); } - /// Add a new column key to metadata. - /// - /// This method won't check if the column already exists. But - /// will return if the column is successfully added. - pub async fn add_column( - &self, - physical_region_id: RegionId, - logical_region_id: RegionId, - column_metadata: &ColumnMetadata, - ) -> Result { - let region_id = utils::to_metadata_region_id(physical_region_id); - let column_key = - Self::concat_column_key(logical_region_id, &column_metadata.column_schema.name); - - self.put_if_absent( - region_id, - column_key, - Self::serialize_column_metadata(column_metadata), - ) - .await - } - /// Retrieve a read lock guard of given logical region id. pub async fn read_lock_logical_region( &self, @@ -290,75 +267,6 @@ impl MetadataRegion { // // methods in this block assume the given region id is transformed. impl MetadataRegion { - /// Put if not exist, return if this put operation is successful (error other - /// than "key already exist" will be wrapped in [Err]). - pub async fn put_if_absent( - &self, - region_id: RegionId, - key: String, - value: String, - ) -> Result { - if self.exists(region_id, &key).await? { - return Ok(false); - } - - let put_request = Self::build_put_request(&key, &value); - self.mito - .handle_request( - region_id, - store_api::region_request::RegionRequest::Put(put_request), - ) - .await - .context(MitoWriteOperationSnafu)?; - Ok(true) - } - - /// Check if the given key exists. - /// - /// Notice that due to mito doesn't support transaction, TOCTTOU is possible. - pub async fn exists(&self, region_id: RegionId, key: &str) -> Result { - let scan_req = Self::build_read_request(key); - let record_batch_stream = self - .mito - .scan_to_stream(region_id, scan_req) - .await - .context(MitoReadOperationSnafu)?; - let scan_result = collect(record_batch_stream) - .await - .context(CollectRecordBatchStreamSnafu)?; - - let exist = !scan_result.is_empty() && scan_result.first().unwrap().num_rows() != 0; - Ok(exist) - } - - /// Retrieves the value associated with the given key in the specified region. - /// Returns `Ok(None)` if the key is not found. - #[cfg(test)] - pub async fn get(&self, region_id: RegionId, key: &str) -> Result> { - let scan_req = Self::build_read_request(key); - let record_batch_stream = self - .mito - .scan_to_stream(region_id, scan_req) - .await - .context(MitoReadOperationSnafu)?; - let scan_result = collect(record_batch_stream) - .await - .context(CollectRecordBatchStreamSnafu)?; - - let Some(first_batch) = scan_result.first() else { - return Ok(None); - }; - - let val = first_batch - .column(0) - .get_ref(0) - .as_string() - .unwrap() - .map(|s| s.to_string()); - - Ok(val) - } - /// Load all metadata from a given region. pub async fn get_all(&self, region_id: RegionId) -> Result> { let scan_req = ScanRequest { @@ -416,23 +324,6 @@ impl MetadataRegion { Ok(()) } - /// Builds a [ScanRequest] to read metadata for a given key. - /// The request will contains a EQ filter on the key column. - /// - /// Only the value column is projected. - fn build_read_request(key: &str) -> ScanRequest { - let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key)); - - ScanRequest { - projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), - filters: vec![filter_expr], - output_ordering: None, - limit: None, - series_row_selector: None, - sequence: None, - } - } - pub(crate) fn build_put_request_from_iter( kv: impl Iterator, ) -> RegionPutRequest { @@ -479,47 +370,6 @@ impl MetadataRegion { RegionPutRequest { rows, hint: None } } - fn build_put_request(key: &str, value: &str) -> RegionPutRequest { - let cols = vec![ - ColumnSchema { - column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), - datatype: ColumnDataType::TimestampMillisecond as _, - semantic_type: SemanticType::Timestamp as _, - ..Default::default() - }, - ColumnSchema { - column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), - datatype: ColumnDataType::String as _, - semantic_type: SemanticType::Tag as _, - ..Default::default() - }, - ColumnSchema { - column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), - datatype: ColumnDataType::String as _, - semantic_type: SemanticType::Field as _, - ..Default::default() - }, - ]; - let rows = Rows { - schema: cols, - rows: vec![Row { - values: vec![ - Value { - value_data: Some(ValueData::TimestampMillisecondValue(0)), - }, - Value { - value_data: Some(ValueData::StringValue(key.to_string())), - }, - Value { - value_data: Some(ValueData::StringValue(value.to_string())), - }, - ], - }], - }; - - RegionPutRequest { rows, hint: None } - } - fn build_delete_request(keys: &[String]) -> RegionDeleteRequest { let cols = vec![ ColumnSchema { @@ -557,16 +407,21 @@ impl MetadataRegion { pub async fn add_logical_regions( &self, physical_region_id: RegionId, + write_region_id: bool, logical_regions: impl Iterator)>, ) -> Result<()> { let region_id = utils::to_metadata_region_id(physical_region_id); let iter = logical_regions .into_iter() .flat_map(|(logical_region_id, column_metadatas)| { - Some(( - MetadataRegion::concat_region_key(logical_region_id), - String::new(), - )) + if write_region_id { + Some(( + MetadataRegion::concat_region_key(logical_region_id), + String::new(), + )) + } else { + None + } .into_iter() .chain(column_metadatas.into_iter().map( move |(name, column_metadata)| { @@ -592,6 +447,136 @@ impl MetadataRegion { } } +#[cfg(test)] +impl MetadataRegion { + fn build_put_request(key: &str, value: &str) -> RegionPutRequest { + let cols = vec![ + ColumnSchema { + column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + semantic_type: SemanticType::Timestamp as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Tag as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Field as _, + ..Default::default() + }, + ]; + let rows = Rows { + schema: cols, + rows: vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + Value { + value_data: Some(ValueData::StringValue(value.to_string())), + }, + ], + }], + }; + + RegionPutRequest { rows, hint: None } + } + + /// Check if the given key exists. + /// + /// Notice that due to mito doesn't support transaction, TOCTTOU is possible. + pub async fn exists(&self, region_id: RegionId, key: &str) -> Result { + let scan_req = Self::build_read_request(key); + let record_batch_stream = self + .mito + .scan_to_stream(region_id, scan_req) + .await + .context(MitoReadOperationSnafu)?; + let scan_result = collect(record_batch_stream) + .await + .context(CollectRecordBatchStreamSnafu)?; + + let exist = !scan_result.is_empty() && scan_result.first().unwrap().num_rows() != 0; + Ok(exist) + } + + /// Put if not exist, return if this put operation is successful (error other + /// than "key already exist" will be wrapped in [Err]). + pub async fn put_if_absent( + &self, + region_id: RegionId, + key: String, + value: String, + ) -> Result { + if self.exists(region_id, &key).await? { + return Ok(false); + } + + let put_request = Self::build_put_request(&key, &value); + self.mito + .handle_request( + region_id, + store_api::region_request::RegionRequest::Put(put_request), + ) + .await + .context(MitoWriteOperationSnafu)?; + Ok(true) + } + + /// Retrieves the value associated with the given key in the specified region. + /// Returns `Ok(None)` if the key is not found. + pub async fn get(&self, region_id: RegionId, key: &str) -> Result> { + let scan_req = Self::build_read_request(key); + let record_batch_stream = self + .mito + .scan_to_stream(region_id, scan_req) + .await + .context(MitoReadOperationSnafu)?; + let scan_result = collect(record_batch_stream) + .await + .context(CollectRecordBatchStreamSnafu)?; + + let Some(first_batch) = scan_result.first() else { + return Ok(None); + }; + + let val = first_batch + .column(0) + .get_ref(0) + .as_string() + .unwrap() + .map(|s| s.to_string()); + + Ok(val) + } + + /// Builds a [ScanRequest] to read metadata for a given key. + /// The request will contains a EQ filter on the key column. + /// + /// Only the value column is projected. + fn build_read_request(key: &str) -> ScanRequest { + let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) + .eq(datafusion::prelude::lit(key)); + + ScanRequest { + projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), + filters: vec![filter_expr], + output_ordering: None, + limit: None, + series_row_selector: None, + sequence: None, + } + } +} + #[cfg(test)] mod test { use datatypes::data_type::ConcreteDataType; @@ -668,7 +653,8 @@ mod test { #[test] fn test_build_read_request() { let key = "test_key"; - let expected_filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key)); + let expected_filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) + .eq(datafusion::prelude::lit(key)); let expected_scan_request = ScanRequest { projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), filters: vec![expected_filter_expr], @@ -767,48 +753,6 @@ mod test { assert_eq!(result.unwrap(), Some(value)); } - #[tokio::test] - async fn test_add_column() { - let env = TestEnv::new().await; - env.init_metric_region().await; - let metadata_region = env.metadata_region(); - let physical_region_id = to_metadata_region_id(env.default_physical_region_id()); - - let logical_region_id = RegionId::new(868, 8390); - let column_name = "column1"; - let semantic_type = SemanticType::Tag; - let column_metadata = ColumnMetadata { - column_schema: ColumnSchema::new( - column_name, - ConcreteDataType::string_datatype(), - false, - ), - semantic_type, - column_id: 5, - }; - metadata_region - .add_column(physical_region_id, logical_region_id, &column_metadata) - .await - .unwrap(); - let actual_semantic_type = metadata_region - .column_semantic_type(physical_region_id, logical_region_id, column_name) - .await - .unwrap(); - assert_eq!(actual_semantic_type, Some(semantic_type)); - - // duplicate column won't be updated - let is_updated = metadata_region - .add_column(physical_region_id, logical_region_id, &column_metadata) - .await - .unwrap(); - assert!(!is_updated); - let actual_semantic_type = metadata_region - .column_semantic_type(physical_region_id, logical_region_id, column_name) - .await - .unwrap(); - assert_eq!(actual_semantic_type, Some(semantic_type)); - } - fn test_column_metadatas() -> HashMap { HashMap::from([ ( @@ -855,7 +799,7 @@ mod test { .collect::>(), )]; metadata_region - .add_logical_regions(physical_region_id, iter.into_iter()) + .add_logical_regions(physical_region_id, true, iter.into_iter()) .await .unwrap(); // Add logical region again. @@ -867,7 +811,7 @@ mod test { .collect::>(), )]; metadata_region - .add_logical_regions(physical_region_id, iter.into_iter()) + .add_logical_regions(physical_region_id, true, iter.into_iter()) .await .unwrap(); diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 58afdaf128b5..af82cd1deb86 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -30,7 +30,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::schema::FulltextOptions; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use strum::IntoStaticStr; +use strum::{AsRefStr, IntoStaticStr}; use crate::logstore::entry; use crate::metadata::{ @@ -475,7 +475,7 @@ impl TryFrom for RegionAlterRequest { } /// Kind of the alteration. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)] pub enum AlterKind { /// Add columns to the region. AddColumns { From 36ddeb2191073c09ec1eea6a79d2afa04f39ce96 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 12 Feb 2025 09:20:00 +0000 Subject: [PATCH 2/4] refactor: minor refactor --- src/metric-engine/src/data_region.rs | 59 ++++++++----------- src/metric-engine/src/engine/alter.rs | 22 ++----- src/metric-engine/src/engine/create.rs | 23 ++------ .../src/engine/create/add_columns.rs | 42 ------------- 4 files changed, 38 insertions(+), 108 deletions(-) delete mode 100644 src/metric-engine/src/engine/create/add_columns.rs diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index f903587d1087..22ce8f526186 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -13,8 +13,6 @@ // limitations under the License. use api::v1::SemanticType; -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; use common_telemetry::{debug, info, warn}; use datatypes::schema::{SkippingIndexOptions, SkippingIndexType}; use mito2::engine::MitoEngine; @@ -32,11 +30,9 @@ use crate::error::{ ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu, }; -use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION}; +use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT}; use crate::utils; -const MAX_RETRIES: usize = 5; - /// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It /// will handle all the data related operations across physical tables. Thus /// every operation should be associated to a [RegionId], which is the physical @@ -65,33 +61,30 @@ impl DataRegion { pub async fn add_columns( &self, region_id: RegionId, - columns: &mut [ColumnMetadata], + columns: Vec, index_options: IndexOptions, ) -> Result<()> { + // Return early if no new columns are added. + if columns.is_empty() { + return Ok(()); + } + let region_id = utils::to_data_region_id(region_id); - let mut retries = 0; - // submit alter request - while retries < MAX_RETRIES { - let request = self - .assemble_alter_request(region_id, columns, index_options) - .await?; + let num_columns = columns.len(); + let request = self + .assemble_alter_request(region_id, columns, index_options) + .await?; - let _timer = MITO_DDL_DURATION.start_timer(); + let _timer = MITO_DDL_DURATION.start_timer(); - let result = self.mito.handle_request(region_id, request).await; - match result { - Ok(_) => return Ok(()), - Err(e) if e.status_code() == StatusCode::RequestOutdated => { - info!("Retrying alter {region_id} due to outdated schema version, times {retries}"); - retries += 1; - continue; - } - Err(e) => { - return Err(e).context(MitoWriteOperationSnafu)?; - } - } - } + let _ = self + .mito + .handle_request(region_id, request) + .await + .context(MitoWriteOperationSnafu)?; + + PHYSICAL_COLUMN_COUNT.add(num_columns as _); Ok(()) } @@ -101,7 +94,7 @@ impl DataRegion { async fn assemble_alter_request( &self, region_id: RegionId, - columns: &mut [ColumnMetadata], + columns: Vec, index_options: IndexOptions, ) -> Result { // retrieve underlying version @@ -128,9 +121,9 @@ impl DataRegion { // overwrite semantic type let new_columns = columns - .iter_mut() + .into_iter() .enumerate() - .map(|(delta, c)| { + .map(|(delta, mut c)| { if c.semantic_type == SemanticType::Tag { if !c.column_schema.data_type.is_string() { return ColumnTypeMismatchSnafu { @@ -254,7 +247,7 @@ mod test { // TestEnv will create a logical region which changes the version to 1. assert_eq!(current_version, 1); - let mut new_columns = vec![ + let new_columns = vec![ ColumnMetadata { column_id: 0, semantic_type: SemanticType::Tag, @@ -277,7 +270,7 @@ mod test { env.data_region() .add_columns( env.default_physical_region_id(), - &mut new_columns, + new_columns, IndexOptions::Inverted, ) .await @@ -311,7 +304,7 @@ mod test { let env = TestEnv::new().await; env.init_metric_region().await; - let mut new_columns = vec![ColumnMetadata { + let new_columns = vec![ColumnMetadata { column_id: 0, semantic_type: SemanticType::Tag, column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false), @@ -320,7 +313,7 @@ mod test { .data_region() .add_columns( env.default_physical_region_id(), - &mut new_columns, + new_columns, IndexOptions::Inverted, ) .await; diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 8822572d2a3a..0fbe3181e839 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -26,7 +26,6 @@ use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::storage::RegionId; use validate::validate_alter_region_requests; -use crate::engine::create::add_columns_to_physical_data_region; use crate::engine::MetricEngineInner; use crate::error::{ LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, @@ -116,13 +115,9 @@ impl MetricEngineInner { write_guards.insert(*region_id, _write_guard); } - add_columns_to_physical_data_region( - data_region_id, - index_options, - &mut new_columns_to_add, - &self.data_region, - ) - .await?; + self.data_region + .add_columns(data_region_id, new_columns_to_add, index_options) + .await?; let physical_columns = self.data_region.physical_columns(data_region_id).await?; let physical_schema_map = physical_columns @@ -147,15 +142,10 @@ impl MetricEngineInner { ) }); - let new_add_columns = new_columns_to_add.iter().map(|metadata| { + let new_add_columns = new_column_names.iter().map(|name| { // Safety: previous steps ensure the physical region exist - let column_metadata = *physical_schema_map - .get(metadata.column_schema.name.as_str()) - .unwrap(); - ( - metadata.column_schema.name.to_string(), - column_metadata.column_id, - ) + let column_metadata = *physical_schema_map.get(name).unwrap(); + (name.to_string(), column_metadata.column_id) }); // Writes logical regions metadata to metadata region diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 186fe9288c66..8ebd29be23ad 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod add_columns; mod extract_new_columns; mod validate; use std::collections::{HashMap, HashSet}; -pub(crate) use add_columns::add_columns_to_physical_data_region; use api::v1::SemanticType; use common_telemetry::info; use common_time::{Timestamp, FOREVER}; @@ -197,13 +195,9 @@ impl MetricEngineInner { }; // TODO(weny): we dont need to pass a mutable new_columns here. - add_columns_to_physical_data_region( - data_region_id, - index_option, - &mut new_columns, - &self.data_region, - ) - .await?; + self.data_region + .add_columns(data_region_id, new_columns, index_option) + .await?; let physical_columns = self.data_region.physical_columns(data_region_id).await?; let physical_schema_map = physical_columns @@ -231,15 +225,10 @@ impl MetricEngineInner { ) }); - let new_add_columns = new_columns.iter().map(|metadata| { + let new_add_columns = new_column_names.iter().map(|name| { // Safety: previous steps ensure the physical region exist - let column_metadata = *physical_schema_map - .get(metadata.column_schema.name.as_str()) - .unwrap(); - ( - metadata.column_schema.name.to_string(), - column_metadata.column_id, - ) + let column_metadata = *physical_schema_map.get(name).unwrap(); + (name.to_string(), column_metadata.column_id) }); extension_return_value.insert( diff --git a/src/metric-engine/src/engine/create/add_columns.rs b/src/metric-engine/src/engine/create/add_columns.rs deleted file mode 100644 index 78c66ac9ee92..000000000000 --- a/src/metric-engine/src/engine/create/add_columns.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use store_api::metadata::ColumnMetadata; -use store_api::storage::RegionId; - -use crate::data_region::DataRegion; -use crate::engine::IndexOptions; -use crate::error::Result; -use crate::metrics::PHYSICAL_COLUMN_COUNT; - -/// Add new columns to the physical data region. -pub(crate) async fn add_columns_to_physical_data_region( - data_region_id: RegionId, - index_options: IndexOptions, - new_columns: &mut [ColumnMetadata], - data_region: &DataRegion, -) -> Result<()> { - // Return early if no new columns are added. - if new_columns.is_empty() { - return Ok(()); - } - - data_region - .add_columns(data_region_id, new_columns, index_options) - .await?; - - PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); - - Ok(()) -} From cba0bae03e78373b4dec2b198a95a68b9219a1ea Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 12 Feb 2025 11:39:02 +0000 Subject: [PATCH 3/4] refactor: push down filter to mito --- Cargo.lock | 2 + src/metric-engine/Cargo.toml | 2 + src/metric-engine/src/metadata_region.rs | 399 ++++++++--------------- 3 files changed, 141 insertions(+), 262 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bdb317b20284..0871fac04561 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6781,6 +6781,7 @@ version = "0.12.0" dependencies = [ "api", "aquamarine", + "async-stream", "async-trait", "base64 0.21.7", "common-base", @@ -6793,6 +6794,7 @@ dependencies = [ "common-time", "datafusion", "datatypes", + "futures-util", "itertools 0.10.5", "lazy_static", "mito2", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index f8d21c35d400..5fe5ed3cb5a0 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] api.workspace = true aquamarine.workspace = true +async-stream.workspace = true async-trait.workspace = true base64.workspace = true common-base.workspace = true @@ -21,6 +22,7 @@ common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true datatypes.workspace = true +futures-util.workspace = true itertools.workspace = true lazy_static = "1.4" mito2.workspace = true diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 55b933b2cf98..753251c72bb9 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -17,9 +17,13 @@ use std::sync::Arc; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; +use async_stream::try_stream; use base64::engine::general_purpose::STANDARD_NO_PAD; use base64::Engine; -use common_recordbatch::util::collect; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use datafusion::prelude::{col, lit}; +use futures_util::stream::BoxStream; +use futures_util::TryStreamExt; use mito2::engine::MitoEngine; use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; @@ -146,22 +150,6 @@ impl MetadataRegion { Ok(()) } - /// Check if the given column exists. Return the semantic type if exists. - #[cfg(test)] - pub async fn column_semantic_type( - &self, - physical_region_id: RegionId, - logical_region_id: RegionId, - column_name: &str, - ) -> Result> { - let region_id = utils::to_metadata_region_id(physical_region_id); - let column_key = Self::concat_column_key(logical_region_id, column_name); - let semantic_type = self.get(region_id, &column_key).await?; - semantic_type - .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type)) - .transpose() - } - // TODO(ruihang): avoid using `get_all` /// Get all the columns of a given logical region. /// Return a list of (column_name, column_metadata). @@ -174,7 +162,10 @@ impl MetadataRegion { let region_column_prefix = Self::concat_column_key_prefix(logical_region_id); let mut columns = vec![]; - for (k, v) in self.get_all(metadata_region_id).await? { + for (k, v) in self + .get_all_with_prefix(metadata_region_id, ®ion_column_prefix) + .await? + { if !k.starts_with(®ion_column_prefix) { continue; } @@ -191,7 +182,10 @@ impl MetadataRegion { let metadata_region_id = utils::to_metadata_region_id(physical_region_id); let mut regions = vec![]; - for (k, _) in self.get_all(metadata_region_id).await? { + for k in self + .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX) + .await? + { if !k.starts_with(REGION_PREFIX) { continue; } @@ -263,51 +257,122 @@ impl MetadataRegion { } } +/// Decode a record batch stream to a stream of items. +pub fn decode_batch_stream( + mut record_batch_stream: SendableRecordBatchStream, + decode: fn(RecordBatch) -> Vec, +) -> BoxStream<'static, Result> { + let stream = try_stream! { + while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? { + for item in decode(batch) { + yield item; + } + } + }; + Box::pin(stream) +} + +/// Decode a record batch to a list of key and value. +fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> { + let key_col = batch.column(0); + let val_col = batch.column(1); + + (0..batch.num_rows()) + .flat_map(move |row_index| { + let key = key_col + .get_ref(row_index) + .as_string() + .unwrap() + .map(|s| s.to_string()); + + key.map(|k| { + ( + k, + val_col + .get_ref(row_index) + .as_string() + .unwrap() + .map(|s| s.to_string()) + .unwrap_or_default(), + ) + }) + }) + .collect() +} + +/// Decode a record batch to a list of key. +fn decode_record_batch_to_key(batch: RecordBatch) -> Vec { + let key_col = batch.column(0); + + (0..batch.num_rows()) + .flat_map(move |row_index| { + let key = key_col + .get_ref(row_index) + .as_string() + .unwrap() + .map(|s| s.to_string()); + key + }) + .collect() +} + // simulate to `KvBackend` // // methods in this block assume the given region id is transformed. impl MetadataRegion { - /// Load all metadata from a given region. - pub async fn get_all(&self, region_id: RegionId) -> Result> { - let scan_req = ScanRequest { - projection: Some(vec![ + fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest { + let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix)); + + let projection = if key_only { + vec![METADATA_SCHEMA_KEY_COLUMN_INDEX] + } else { + vec![ METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_INDEX, - ]), - filters: vec![], + ] + }; + ScanRequest { + projection: Some(projection), + filters: vec![filter_expr], output_ordering: None, limit: None, series_row_selector: None, sequence: None, - }; + } + } + + pub async fn get_all_with_prefix( + &self, + region_id: RegionId, + prefix: &str, + ) -> Result> { + let scan_req = MetadataRegion::build_prefix_read_request(prefix, false); let record_batch_stream = self .mito .scan_to_stream(region_id, scan_req) .await .context(MitoReadOperationSnafu)?; - let scan_result = collect(record_batch_stream) + + decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value) + .try_collect::>() .await - .context(CollectRecordBatchStreamSnafu)?; + } - let mut result = HashMap::new(); - for batch in scan_result { - let key_col = batch.column(0); - let val_col = batch.column(1); - for row_index in 0..batch.num_rows() { - let key = key_col - .get_ref(row_index) - .as_string() - .unwrap() - .map(|s| s.to_string()); - let val = val_col - .get_ref(row_index) - .as_string() - .unwrap() - .map(|s| s.to_string()); - result.insert(key.unwrap(), val.unwrap_or_default()); - } - } - Ok(result) + pub async fn get_all_key_with_prefix( + &self, + region_id: RegionId, + prefix: &str, + ) -> Result> { + let scan_req = MetadataRegion::build_prefix_read_request(prefix, true); + let record_batch_stream = self + .mito + .scan_to_stream(region_id, scan_req) + .await + .context(MitoReadOperationSnafu)?; + + decode_batch_stream(record_batch_stream, decode_record_batch_to_key) + .try_collect::>() + .await } /// Delete the given keys. For performance consideration, this method @@ -449,98 +514,26 @@ impl MetadataRegion { #[cfg(test)] impl MetadataRegion { - fn build_put_request(key: &str, value: &str) -> RegionPutRequest { - let cols = vec![ - ColumnSchema { - column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), - datatype: ColumnDataType::TimestampMillisecond as _, - semantic_type: SemanticType::Timestamp as _, - ..Default::default() - }, - ColumnSchema { - column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), - datatype: ColumnDataType::String as _, - semantic_type: SemanticType::Tag as _, - ..Default::default() - }, - ColumnSchema { - column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), - datatype: ColumnDataType::String as _, - semantic_type: SemanticType::Field as _, - ..Default::default() - }, - ]; - let rows = Rows { - schema: cols, - rows: vec![Row { - values: vec![ - Value { - value_data: Some(ValueData::TimestampMillisecondValue(0)), - }, - Value { - value_data: Some(ValueData::StringValue(key.to_string())), - }, - Value { - value_data: Some(ValueData::StringValue(value.to_string())), - }, - ], - }], - }; - - RegionPutRequest { rows, hint: None } - } - - /// Check if the given key exists. - /// - /// Notice that due to mito doesn't support transaction, TOCTTOU is possible. - pub async fn exists(&self, region_id: RegionId, key: &str) -> Result { - let scan_req = Self::build_read_request(key); - let record_batch_stream = self - .mito - .scan_to_stream(region_id, scan_req) - .await - .context(MitoReadOperationSnafu)?; - let scan_result = collect(record_batch_stream) - .await - .context(CollectRecordBatchStreamSnafu)?; - - let exist = !scan_result.is_empty() && scan_result.first().unwrap().num_rows() != 0; - Ok(exist) - } - - /// Put if not exist, return if this put operation is successful (error other - /// than "key already exist" will be wrapped in [Err]). - pub async fn put_if_absent( - &self, - region_id: RegionId, - key: String, - value: String, - ) -> Result { - if self.exists(region_id, &key).await? { - return Ok(false); - } - - let put_request = Self::build_put_request(&key, &value); - self.mito - .handle_request( - region_id, - store_api::region_request::RegionRequest::Put(put_request), - ) - .await - .context(MitoWriteOperationSnafu)?; - Ok(true) - } - /// Retrieves the value associated with the given key in the specified region. /// Returns `Ok(None)` if the key is not found. pub async fn get(&self, region_id: RegionId, key: &str) -> Result> { - let scan_req = Self::build_read_request(key); + let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) + .eq(datafusion::prelude::lit(key)); + + let scan_req = ScanRequest { + projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), + filters: vec![filter_expr], + output_ordering: None, + limit: None, + series_row_selector: None, + sequence: None, + }; let record_batch_stream = self .mito .scan_to_stream(region_id, scan_req) .await .context(MitoReadOperationSnafu)?; - let scan_result = collect(record_batch_stream) + let scan_result = common_recordbatch::util::collect(record_batch_stream) .await .context(CollectRecordBatchStreamSnafu)?; @@ -558,22 +551,19 @@ impl MetadataRegion { Ok(val) } - /// Builds a [ScanRequest] to read metadata for a given key. - /// The request will contains a EQ filter on the key column. - /// - /// Only the value column is projected. - fn build_read_request(key: &str) -> ScanRequest { - let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) - .eq(datafusion::prelude::lit(key)); - - ScanRequest { - projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), - filters: vec![filter_expr], - output_ordering: None, - limit: None, - series_row_selector: None, - sequence: None, - } + /// Check if the given column exists. Return the semantic type if exists. + pub async fn column_semantic_type( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + column_name: &str, + ) -> Result> { + let region_id = utils::to_metadata_region_id(physical_region_id); + let column_key = Self::concat_column_key(logical_region_id, column_name); + let semantic_type = self.get(region_id, &column_key).await?; + semantic_type + .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type)) + .transpose() } } @@ -581,7 +571,6 @@ impl MetadataRegion { mod test { use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; - use store_api::region_request::RegionRequest; use super::*; use crate::test_util::TestEnv; @@ -650,109 +639,6 @@ mod test { assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err()); } - #[test] - fn test_build_read_request() { - let key = "test_key"; - let expected_filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) - .eq(datafusion::prelude::lit(key)); - let expected_scan_request = ScanRequest { - projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), - filters: vec![expected_filter_expr], - output_ordering: None, - limit: None, - series_row_selector: None, - sequence: None, - }; - let actual_scan_request = MetadataRegion::build_read_request(key); - assert_eq!(actual_scan_request, expected_scan_request); - } - - #[tokio::test] - async fn test_put_conditionally() { - let env = TestEnv::new().await; - env.init_metric_region().await; - let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_physical_region_id()); - - // Test inserting a new key-value pair - let key = "test_key".to_string(); - let value = "test_value".to_string(); - let result = metadata_region - .put_if_absent(region_id, key.clone(), value.clone()) - .await; - assert!(result.is_ok()); - assert!(result.unwrap()); - - // Verify that the key-value pair was actually inserted - let scan_req = MetadataRegion::build_read_request("test_key"); - let record_batch_stream = metadata_region - .mito - .scan_to_stream(region_id, scan_req) - .await - .unwrap(); - let scan_result = collect(record_batch_stream).await.unwrap(); - assert_eq!(scan_result.len(), 1); - - // Test inserting the same key-value pair again - let result = metadata_region - .put_if_absent(region_id, key.clone(), value.clone()) - .await; - assert!(result.is_ok()); - assert!(!result.unwrap(),); - } - - #[tokio::test] - async fn test_exist() { - let env = TestEnv::new().await; - env.init_metric_region().await; - let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_physical_region_id()); - - // Test checking for a non-existent key - let key = "test_key".to_string(); - let result = metadata_region.exists(region_id, &key).await; - assert!(result.is_ok()); - assert!(!result.unwrap()); - - // Test inserting a key and then checking for its existence - let value = "test_value".to_string(); - let put_request = MetadataRegion::build_put_request(&key, &value); - metadata_region - .mito - .handle_request(region_id, RegionRequest::Put(put_request)) - .await - .unwrap(); - let result = metadata_region.exists(region_id, &key).await; - assert!(result.is_ok()); - assert!(result.unwrap(),); - } - - #[tokio::test] - async fn test_get() { - let env = TestEnv::new().await; - env.init_metric_region().await; - let metadata_region = env.metadata_region(); - let region_id = to_metadata_region_id(env.default_physical_region_id()); - - // Test getting a non-existent key - let key = "test_key".to_string(); - let result = metadata_region.get(region_id, &key).await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), None); - - // Test inserting a key and then getting its value - let value = "test_value".to_string(); - let put_request = MetadataRegion::build_put_request(&key, &value); - metadata_region - .mito - .handle_request(region_id, RegionRequest::Put(put_request)) - .await - .unwrap(); - let result = metadata_region.get(region_id, &key).await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), Some(value)); - } - fn test_column_metadatas() -> HashMap { HashMap::from([ ( @@ -821,17 +707,6 @@ mod test { .await .unwrap(); assert_eq!(logical_regions.len(), 2); - assert_eq!(logical_regions[1], logical_region_id); - - // Check if the logical region exists. - let result = metadata_region - .exists( - physical_region_id, - &MetadataRegion::concat_region_key(logical_region_id), - ) - .await - .unwrap(); - assert!(result); // Check if the logical region columns are added. let logical_columns = metadata_region From 73ce8a1c9c75586af3f00dc8068ad3c9918c5382 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 14 Feb 2025 07:44:32 +0000 Subject: [PATCH 4/4] chore: apply suggestions from CR --- src/metric-engine/src/engine/alter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 0fbe3181e839..9bc5f56251c0 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -66,6 +66,7 @@ impl MetricEngineInner { requests: Vec<(RegionId, RegionAlterRequest)>, extension_return_value: &mut HashMap>, ) -> Result { + // Checks all alter requests are add columns. validate_alter_region_requests(&requests)?; let first_logical_region_id = requests[0].0;