Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): change sink type to append only when plan is append-only #20558

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
emit on window close
WITH (connector = 'blackhole');
explain_output: |
StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], downstream_pk: [] }
StreamSink { type: append-only, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum] }
└─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] }
└─StreamEowcSort { sort_column: t.tm }
└─StreamExchange { dist: HashShard(t.bar) }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
create table t2 (a int, b int, watermark for b as b - 4) append only;
explain create sink sk1 from t2 emit on window close with (connector='blackhole');
explain_output: |
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], downstream_pk: [] }
StreamSink { type: append-only, columns: [a, b, t2._row_id(hidden)] }
└─StreamEowcSort { sort_column: t2.b }
└─StreamTableScan { table: t2, columns: [a, b, _row_id] }
- id: create_mock_iceberg_sink_append_only_with_sparse_partition
Expand Down
76 changes: 47 additions & 29 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,13 @@ impl StreamSink {
Ok(Self::new(input, sink_desc, log_store_type))
}

fn is_user_defined_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> {
if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) {
if sink_type != SINK_TYPE_APPEND_ONLY
&& sink_type != SINK_TYPE_DEBEZIUM
&& sink_type != SINK_TYPE_UPSERT
{
if sink_type == SINK_TYPE_APPEND_ONLY {
return Ok(Some(SinkType::AppendOnly));
} else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT {
return Ok(Some(SinkType::Upsert));
} else {
return Err(ErrorCode::SinkError(Box::new(Error::new(
ErrorKind::InvalidInput,
format!(
Expand All @@ -439,7 +440,7 @@ impl StreamSink {
.into());
}
}
Ok(properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY))
Ok(None)
}

fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> {
Expand All @@ -465,46 +466,63 @@ impl StreamSink {
format_desc: Option<&SinkFormatDesc>,
) -> Result<SinkType> {
let frontend_derived_append_only = input_append_only;
let (user_defined_append_only, user_force_append_only, syntax_legacy) = match format_desc {
let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc {
Some(f) => (
f.format == SinkFormat::AppendOnly,
Some(match f.format {
SinkFormat::AppendOnly => SinkType::AppendOnly,
SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert,
}),
Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets(
f.options.clone(),
))?,
false,
),
None => (
Self::is_user_defined_append_only(properties)?,
Self::sink_type_in_prop(properties)?,
Self::is_user_force_append_only(properties)?,
true,
),
};

match (
frontend_derived_append_only,
user_defined_append_only,
user_force_append_only,
) {
(true, true, _) => Ok(SinkType::AppendOnly),
(false, true, true) => Ok(SinkType::ForceAppendOnly),
(_, false, false) => Ok(SinkType::Upsert),
(false, true, false) => {
Err(ErrorCode::SinkError(Box::new(Error::new(
if let Some(user_defined_sink_type) = user_defined_sink_type {
if user_defined_sink_type == SinkType::AppendOnly {
if user_force_append_only {
return Ok(SinkType::ForceAppendOnly);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When upstream is append-only, and user specifies both append-only and force-append-only, the SinkType changed from AppendOnly to ForceAppendOnly.

}
if !frontend_derived_append_only {
return Err(ErrorCode::SinkError(Box::new(Error::new(
ErrorKind::InvalidInput,
format!(
"The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
),
)))
.into());
} else {
return Ok(SinkType::AppendOnly);
}
}

Ok(user_defined_sink_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of upstream, when user specifies both upsert and force-append-only, it changed from returning error to Ok(SinkType::Upsert).

This is the CI failure.

} else {
if user_force_append_only {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If upstream is append-only, and omitting type = will result in SInkType::AppendOnly (the intended new behavior), isn't it more natural to tolerate an unnecessary force-append-only?

We can move this check after the match below and only reject for Upsert.

return Err(ErrorCode::SinkError(Box::new(Error::new(
ErrorKind::InvalidInput,
format!(
"The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \
Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.",
if syntax_legacy { "WITH" } else { "FORMAT ENCODE" }
"Cannot force the sink to be append-only without \"{}\".",
if syntax_legacy {
"type='append-only'"
} else {
"FORMAT PLAIN"
}
),
)))
.into())
.into());
}
(_, false, true) => {
Err(ErrorCode::SinkError(Box::new(Error::new(
ErrorKind::InvalidInput,
format!("Cannot force the sink to be append-only without \"{}\".", if syntax_legacy { "type='append-only'" } else { "FORMAT PLAIN" }),
)))
.into())
match frontend_derived_append_only {
true => Ok(SinkType::AppendOnly),
false => Ok(SinkType::Upsert),
}
}
}
Expand Down
Loading