-
Notifications
You must be signed in to change notification settings - Fork 609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): change sink type to append only when plan is append-only #20558
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -420,12 +420,13 @@ impl StreamSink { | |
Ok(Self::new(input, sink_desc, log_store_type)) | ||
} | ||
|
||
fn is_user_defined_append_only(properties: &WithOptionsSecResolved) -> Result<bool> { | ||
fn sink_type_in_prop(properties: &WithOptionsSecResolved) -> Result<Option<SinkType>> { | ||
if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) { | ||
if sink_type != SINK_TYPE_APPEND_ONLY | ||
&& sink_type != SINK_TYPE_DEBEZIUM | ||
&& sink_type != SINK_TYPE_UPSERT | ||
{ | ||
if sink_type == SINK_TYPE_APPEND_ONLY { | ||
return Ok(Some(SinkType::AppendOnly)); | ||
} else if sink_type == SINK_TYPE_DEBEZIUM || sink_type == SINK_TYPE_UPSERT { | ||
return Ok(Some(SinkType::Upsert)); | ||
} else { | ||
return Err(ErrorCode::SinkError(Box::new(Error::new( | ||
ErrorKind::InvalidInput, | ||
format!( | ||
|
@@ -439,7 +440,7 @@ impl StreamSink { | |
.into()); | ||
} | ||
} | ||
Ok(properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY)) | ||
Ok(None) | ||
} | ||
|
||
fn is_user_force_append_only(properties: &WithOptionsSecResolved) -> Result<bool> { | ||
|
@@ -465,46 +466,63 @@ impl StreamSink { | |
format_desc: Option<&SinkFormatDesc>, | ||
) -> Result<SinkType> { | ||
let frontend_derived_append_only = input_append_only; | ||
let (user_defined_append_only, user_force_append_only, syntax_legacy) = match format_desc { | ||
let (user_defined_sink_type, user_force_append_only, syntax_legacy) = match format_desc { | ||
Some(f) => ( | ||
f.format == SinkFormat::AppendOnly, | ||
Some(match f.format { | ||
SinkFormat::AppendOnly => SinkType::AppendOnly, | ||
SinkFormat::Upsert | SinkFormat::Debezium => SinkType::Upsert, | ||
}), | ||
Self::is_user_force_append_only(&WithOptionsSecResolved::without_secrets( | ||
f.options.clone(), | ||
))?, | ||
false, | ||
), | ||
None => ( | ||
Self::is_user_defined_append_only(properties)?, | ||
Self::sink_type_in_prop(properties)?, | ||
Self::is_user_force_append_only(properties)?, | ||
true, | ||
), | ||
}; | ||
|
||
match ( | ||
frontend_derived_append_only, | ||
user_defined_append_only, | ||
user_force_append_only, | ||
) { | ||
(true, true, _) => Ok(SinkType::AppendOnly), | ||
(false, true, true) => Ok(SinkType::ForceAppendOnly), | ||
(_, false, false) => Ok(SinkType::Upsert), | ||
(false, true, false) => { | ||
Err(ErrorCode::SinkError(Box::new(Error::new( | ||
if let Some(user_defined_sink_type) = user_defined_sink_type { | ||
if user_defined_sink_type == SinkType::AppendOnly { | ||
if user_force_append_only { | ||
return Ok(SinkType::ForceAppendOnly); | ||
} | ||
if !frontend_derived_append_only { | ||
return Err(ErrorCode::SinkError(Box::new(Error::new( | ||
ErrorKind::InvalidInput, | ||
format!( | ||
"The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \ | ||
Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.", | ||
if syntax_legacy { "WITH" } else { "FORMAT ENCODE" } | ||
), | ||
))) | ||
.into()); | ||
} else { | ||
return Ok(SinkType::AppendOnly); | ||
} | ||
} | ||
|
||
Ok(user_defined_sink_type) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regardless of upstream, when user specifies both upsert and force-append-only, it changed from returning error to This is the CI failure. |
||
} else { | ||
if user_force_append_only { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If upstream is append-only, and omitting We can move this check after the match below and only reject for Upsert. |
||
return Err(ErrorCode::SinkError(Box::new(Error::new( | ||
ErrorKind::InvalidInput, | ||
format!( | ||
"The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. \ | ||
Notice that this will cause the sink executor to drop DELETE messages and convert UPDATE messages to INSERT.", | ||
if syntax_legacy { "WITH" } else { "FORMAT ENCODE" } | ||
"Cannot force the sink to be append-only without \"{}\".", | ||
if syntax_legacy { | ||
"type='append-only'" | ||
} else { | ||
"FORMAT PLAIN" | ||
} | ||
), | ||
))) | ||
.into()) | ||
.into()); | ||
} | ||
(_, false, true) => { | ||
Err(ErrorCode::SinkError(Box::new(Error::new( | ||
ErrorKind::InvalidInput, | ||
format!("Cannot force the sink to be append-only without \"{}\".", if syntax_legacy { "type='append-only'" } else { "FORMAT PLAIN" }), | ||
))) | ||
.into()) | ||
match frontend_derived_append_only { | ||
true => Ok(SinkType::AppendOnly), | ||
false => Ok(SinkType::Upsert), | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When upstream is append-only, and user specifies both append-only and force-append-only, the
SinkType
changed fromAppendOnly
toForceAppendOnly
.