Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize table alteration speed in metric engine #5526

Merged
merged 4 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/metric-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true
[dependencies]
api.workspace = true
aquamarine.workspace = true
async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
common-base.workspace = true
Expand All @@ -21,6 +22,7 @@ common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static = "1.4"
mito2.workspace = true
Expand Down
59 changes: 26 additions & 33 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.

use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::{debug, info, warn};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
Expand All @@ -32,11 +30,9 @@ use crate::error::{
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
use crate::utils;

const MAX_RETRIES: usize = 5;

/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
/// will handle all the data related operations across physical tables. Thus
/// every operation should be associated to a [RegionId], which is the physical
Expand Down Expand Up @@ -65,33 +61,30 @@ impl DataRegion {
pub async fn add_columns(
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
columns: Vec<ColumnMetadata>,
index_options: IndexOptions,
) -> Result<()> {
// Return early if no new columns are added.
if columns.is_empty() {
return Ok(());
}

let region_id = utils::to_data_region_id(region_id);

let mut retries = 0;
// submit alter request
while retries < MAX_RETRIES {
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;
let num_columns = columns.len();
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;

let _timer = MITO_DDL_DURATION.start_timer();
let _timer = MITO_DDL_DURATION.start_timer();

let result = self.mito.handle_request(region_id, request).await;
match result {
Ok(_) => return Ok(()),
Err(e) if e.status_code() == StatusCode::RequestOutdated => {
info!("Retrying alter {region_id} due to outdated schema version, times {retries}");
retries += 1;
continue;
}
Err(e) => {
return Err(e).context(MitoWriteOperationSnafu)?;
}
}
}
let _ = self
.mito
.handle_request(region_id, request)
.await
.context(MitoWriteOperationSnafu)?;

PHYSICAL_COLUMN_COUNT.add(num_columns as _);

Ok(())
}
Expand All @@ -101,7 +94,7 @@ impl DataRegion {
async fn assemble_alter_request(
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
columns: Vec<ColumnMetadata>,
index_options: IndexOptions,
) -> Result<RegionRequest> {
// retrieve underlying version
Expand All @@ -128,9 +121,9 @@ impl DataRegion {

// overwrite semantic type
let new_columns = columns
.iter_mut()
.into_iter()
.enumerate()
.map(|(delta, c)| {
.map(|(delta, mut c)| {
if c.semantic_type == SemanticType::Tag {
if !c.column_schema.data_type.is_string() {
return ColumnTypeMismatchSnafu {
Expand Down Expand Up @@ -254,7 +247,7 @@ mod test {
// TestEnv will create a logical region which changes the version to 1.
assert_eq!(current_version, 1);

let mut new_columns = vec![
let new_columns = vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
Expand All @@ -277,7 +270,7 @@ mod test {
env.data_region()
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
new_columns,
IndexOptions::Inverted,
)
.await
Expand Down Expand Up @@ -311,7 +304,7 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;

let mut new_columns = vec![ColumnMetadata {
let new_columns = vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
Expand All @@ -320,7 +313,7 @@ mod test {
.data_region()
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
new_columns,
IndexOptions::Inverted,
)
.await;
Expand Down
19 changes: 12 additions & 7 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,17 @@ impl RegionEngine for MetricEngine {
})
}
BatchRegionDdlRequest::Alter(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
)
.await
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.alter_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;

Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionDdlRequest::Drop(requests) => {
self.handle_requests(
Expand Down Expand Up @@ -184,7 +189,7 @@ impl RegionEngine for MetricEngine {
RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
RegionRequest::Alter(alter) => {
self.inner
.alter_region(region_id, alter, &mut extension_return_value)
.alter_regions(vec![(region_id, alter)], &mut extension_return_value)
.await
}
RegionRequest::Compact(_) => {
Expand Down
Loading
Loading