From 1a047ba406fcc61a045b8f438d7d3eb89a1abd07 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 19 Feb 2025 11:43:52 +0800 Subject: [PATCH 1/6] feat(frontend): redirect cast char to varchar (#20532) --- e2e_test/batch/functions/cast.slt.part | 16 ++++++++++++++++ src/frontend/src/binder/expr/mod.rs | 14 ++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/e2e_test/batch/functions/cast.slt.part b/e2e_test/batch/functions/cast.slt.part index a0c8b31eec83d..4f9589031b25f 100644 --- a/e2e_test/batch/functions/cast.slt.part +++ b/e2e_test/batch/functions/cast.slt.part @@ -38,3 +38,19 @@ select date(1); query error unexpected arguments number select date(); + +query ? +select cast('a' as char(1)); +---- +a + +# since we don't support char type, cast to char(2) will be the same as cast to varchar, so its behavior is the same as cast to varchar and not the same as postgresql. +query ? +select 'a'::char(2) = 'a '::char(2); +---- +f + +query ? +select 'a'::varchar = 'a '::varchar; +---- +f diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 5c52bcd0897bc..e46405f0dbd73 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -937,6 +937,20 @@ impl Binder { Err(ErrorCode::BindError(format!("Can't cast {} to regproc", lhs_ty)).into()) } } + // Redirect cast char to varchar to make system like Metabase happy. + // Char is not supported in RisingWave, but some ecosystem tools like Metabase will use it. + // Notice that the behavior of `char` and `varchar` is different in PostgreSQL. + // The following sql result should be different in PostgreSQL: + // ``` + // select 'a'::char(2) = 'a '::char(2); + // ---------- + // t + // + // select 'a'::varchar = 'a '::varchar; + // ---------- + // f + // ``` + AstDataType::Char(_) => self.bind_cast_inner(expr, DataType::Varchar), _ => self.bind_cast_inner(expr, bind_data_type(&data_type)?), } } From 8fa011e9a3b5ad19f1fc033e67bc2c39f17928cf Mon Sep 17 00:00:00 2001 From: Yingjun Wu Date: Wed, 19 Feb 2025 11:45:11 +0800 Subject: [PATCH 2/6] doc: Update README.md (#20529) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index daea1aca5238e..443e31705c1de 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@
-### 🌊 Ride the Wave of Real-Time Data. +### 🌊 Ride the Wave of Streaming Data.

From 587cdd9d73718c8f72b731872b035deee463f771 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Wed, 19 Feb 2025 12:10:50 +0800 Subject: [PATCH 3/6] test: add retry for check on drop table connector (#20534) Co-authored-by: tabversion --- e2e_test/source_inline/kafka/alter_table_drop_connector.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/alter_table_drop_connector.slt b/e2e_test/source_inline/kafka/alter_table_drop_connector.slt index c927a390e17ed..021dbe4e76885 100644 --- a/e2e_test/source_inline/kafka/alter_table_drop_connector.slt +++ b/e2e_test/source_inline/kafka/alter_table_drop_connector.slt @@ -95,7 +95,7 @@ FORMAT PLAIN ENCODE AVRO ( sleep 1s -query ?? +query ?? retry 3 backoff 5s select foo, bar from avro_drop_table_connector_test_table ---- ABC 1 From 0d4c90fa8494f602c47c87b6b15017ed0029ba16 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Tue, 18 Feb 2025 23:56:34 -0500 Subject: [PATCH 4/6] feat(cdc): fix generated column for table on cdc source (#20380) Co-authored-by: stonepage <40830455+st1page@users.noreply.github.com> --- .../source_legacy/cdc/cdc.share_stream.slt | 44 +++++++---- .../cdc/cdc.validate.postgres.slt | 20 +++++ src/frontend/src/handler/create_table.rs | 51 +++++++++---- src/meta/src/rpc/ddl_controller.rs | 74 ++++++++++++++----- 4 files changed, 144 insertions(+), 45 deletions(-) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 435490f4ca7b1..ae48ad00b56a8 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -161,6 +161,17 @@ create table non_exist ( id INT, PRIMARY KEY (id) ) from mysql_mytest table 'mytest.non_exist'; +statement error Not supported: Non-generated column found after a generated column. +create table orders_test ( + next_order_id int as order_id + 1, + order_id int, + order_date timestamp, + customer_name string, + price decimal, + product_id int, + order_status smallint, + PRIMARY KEY (order_id) +) from mysql_mytest table 'mytest.orders'; statement ok create table orders_test ( @@ -170,6 +181,7 @@ create table orders_test ( price decimal, product_id int, order_status smallint, + next_order_id int as order_id + 1, PRIMARY KEY (order_id) ) from mysql_mytest table 'mytest.orders'; @@ -305,11 +317,11 @@ Milk Milk is a white liquid food Juice 100ml Juice query ITTT -SELECT order_id,order_date,customer_name,product_id FROM orders_test order by order_id limit 3 +SELECT order_id,order_date,customer_name,product_id,next_order_id FROM orders_test order by order_id limit 3 ---- -10001 2020-07-30 10:08:22 Jark 102 -10002 2020-07-30 10:11:09 Sally 105 -10003 2020-07-30 12:00:30 Edward 106 +10001 2020-07-30 10:08:22 Jark 102 10002 +10002 2020-07-30 10:11:09 Sally 105 10003 +10003 2020-07-30 12:00:30 Edward 106 10004 query IIIIITTTTTTTTT SELECT c_boolean, c_bit, c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; @@ -393,6 +405,7 @@ CREATE TABLE upper_orders_shared ( name varchar ) FROM pg_source TABLE 'public.Orders'; +# FIXME(kexiang): Currently, the generated rows (next_id in this case) must be at the end of schema, otherwise the frontend will throw an error. statement ok CREATE TABLE person_new ( id int, @@ -400,6 +413,7 @@ CREATE TABLE person_new ( email_address varchar, credit_card varchar, city varchar, + next_id int as id + 1, PRIMARY KEY (id) ) INCLUDE TIMESTAMP AS commit_ts INCLUDE DATABASE_NAME as database_name @@ -459,22 +473,22 @@ cdc_test public person query ITTTT -SELECT id,name,email_address,credit_card,city from person_new order by id; +SELECT id,name,email_address,credit_card,city,next_id from person_new order by id; ---- -1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne -1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise -1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles -1100 noris ypl@qbxfg.com 1864 2539 enne -1101 white myc@xpmpe.com 8157 6974 se -1102 spencer wip@dkaap.com 9481 6270 angeles +1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne 1001 +1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise 1002 +1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles 1003 +1100 noris ypl@qbxfg.com 1864 2539 enne 1101 +1101 white myc@xpmpe.com 8157 6974 se 1102 +1102 spencer wip@dkaap.com 9481 6270 angeles 1103 # historical data query ITTTT -SELECT id,name,email_address,credit_card,city from person_new where commit_ts = '1970-01-01 00:00:00+00:00' order by id; +SELECT id,name,email_address,credit_card,city,next_id from person_new where commit_ts = '1970-01-01 00:00:00+00:00' order by id; ---- -1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne -1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise -1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles +1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne 1001 +1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise 1002 +1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles 1003 # incremental data query ITTTT diff --git a/e2e_test/source_legacy/cdc/cdc.validate.postgres.slt b/e2e_test/source_legacy/cdc/cdc.validate.postgres.slt index 1b14942c7af63..30df0d298ee71 100644 --- a/e2e_test/source_legacy/cdc/cdc.validate.postgres.slt +++ b/e2e_test/source_legacy/cdc/cdc.validate.postgres.slt @@ -169,6 +169,26 @@ create table shipments ( slot.name = 'shipments' ) format canal encode csv; +statement error Not supported: Non-generated column found after a generated column. +create table shipments ( + next_shipment_id INTEGER as shipment + 1, + shipment_id INTEGER, + order_id INTEGER, + origin STRING, + destination STRING, + is_arrived boolean, + PRIMARY KEY (shipment_id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + table.name = 'shipments', + slot.name = 'shipments' +); + statement ok explain create table numeric_to_rw_int256 ( id int, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0010a366ad4da..00948df81061e 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -42,7 +42,7 @@ use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::cdc::external::{ ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; -use risingwave_connector::{source, WithOptionsSecResolved}; +use risingwave_connector::{source, WithOptionsSecResolved, WithPropertiesExt}; use risingwave_pb::catalog::connection::Info as ConnectionInfo; use risingwave_pb::catalog::connection_params::ConnectionType; use risingwave_pb::catalog::source::OptionalAssociatedTableId; @@ -556,6 +556,9 @@ pub(crate) async fn gen_create_table_plan_with_source( let session = &handler_args.session; let with_properties = bind_connector_props(&handler_args, &format_encode, false)?; + if with_properties.is_shareable_cdc_connector() { + generated_columns_check_for_cdc_table(&column_defs)?; + } let db_name: &str = &session.database(); let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; @@ -1153,6 +1156,8 @@ pub(super) async fn handle_create_table_plan( let (format_encode, source_name) = Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?; + generated_columns_check_for_cdc_table(&column_defs)?; + let source = { let catalog_reader = session.env().catalog_reader().read_guard(); let schema_path = @@ -1204,18 +1209,12 @@ pub(super) async fn handle_create_table_plan( .collect(); for col in &mut columns { - let external_column_desc = - *external_columns.get(col.name()).ok_or_else(|| { - ErrorCode::ConnectorError( - format!( - "Column '{}' not found in the upstream database", - col.name() - ) - .into(), - ) - })?; - col.column_desc.generated_or_default_column = - external_column_desc.generated_or_default_column.clone(); + // Keep the default column aligned with external table. + // Note the generated columns have not been initialized yet. If a column is not found here, it should be a generated column. + if let Some(external_column_desc) = external_columns.get(col.name()) { + col.column_desc.generated_or_default_column = + external_column_desc.generated_or_default_column.clone(); + } } (columns, pk_names) } @@ -1257,6 +1256,32 @@ pub(super) async fn handle_create_table_plan( Ok((plan, source, table, job_type)) } +fn generated_columns_check_for_cdc_table(columns: &Vec) -> Result<()> { + let mut found_generated_column = false; + for column in columns { + let mut is_generated = false; + + for option_def in &column.options { + if let ColumnOption::GeneratedColumns(_) = option_def.option { + is_generated = true; + break; + } + } + + if is_generated { + found_generated_column = true; + } else if found_generated_column { + return Err(ErrorCode::NotSupported( + "Non-generated column found after a generated column.".into(), + "Ensure that all generated columns appear at the end of the cdc table definition." + .into(), + ) + .into()); + } + } + Ok(()) +} + fn sanity_check_for_cdc_table( append_only: bool, column_defs: &Vec, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 67f88a96bfb45..de2280c05fe6f 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -650,28 +650,68 @@ impl DdlController { ) })?; - { - if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = - stream_scan_fragment.nodes.as_ref().unwrap().node_body - && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc - { - let options_with_secret = WithOptionsSecResolved::new( - cdc_table_desc.connect_properties.clone(), - cdc_table_desc.secret_refs.clone(), - ); - let mut props = ConnectorProperties::extract(options_with_secret, true)?; - props.init_from_pb_cdc_table_desc(cdc_table_desc); - - // try creating a split enumerator to validate - let _enumerator = props - .create_split_enumerator(SourceEnumeratorContext::dummy().into()) - .await?; - tracing::debug!(?table.id, "validate cdc table success"); + assert_eq!( + stream_scan_fragment.actors.len(), + 1, + "Stream scan fragment should have only one actor" + ); + let mut found_cdc_scan = false; + match &stream_scan_fragment.nodes.as_ref().unwrap().node_body { + Some(NodeBody::StreamCdcScan(_)) => { + if Self::validate_cdc_table_inner( + &stream_scan_fragment.nodes.as_ref().unwrap().node_body, + table.id, + ) + .await? + { + found_cdc_scan = true; + } + } + // When there's generated columns, the cdc scan node is wrapped in a project node + Some(NodeBody::Project(_)) => { + for input in &stream_scan_fragment.nodes.as_ref().unwrap().input { + if Self::validate_cdc_table_inner(&input.node_body, table.id).await? { + found_cdc_scan = true; + } + } } + _ => { + bail!("Unexpected node body for stream cdc scan"); + } + }; + if !found_cdc_scan { + bail!("No stream cdc scan node found in stream scan fragment"); } Ok(()) } + async fn validate_cdc_table_inner( + node_body: &Option, + table_id: u32, + ) -> MetaResult { + if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = node_body + && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc + { + let options_with_secret = WithOptionsSecResolved::new( + cdc_table_desc.connect_properties.clone(), + cdc_table_desc.secret_refs.clone(), + ); + + let mut props = ConnectorProperties::extract(options_with_secret, true)?; + props.init_from_pb_cdc_table_desc(cdc_table_desc); + + // Try creating a split enumerator to validate + let _enumerator = props + .create_split_enumerator(SourceEnumeratorContext::dummy().into()) + .await?; + + tracing::debug!(?table_id, "validate cdc table success"); + Ok(true) + } else { + Ok(false) + } + } + // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream. // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function. // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here. From 01ef2d649944b39ac8925b44332f6c185a87e1f3 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Wed, 19 Feb 2025 13:38:56 +0800 Subject: [PATCH 5/6] feat(frontend): support extended query mode for declare subscription cursor (#20528) --- .../test/java/com/risingwave/TestCursor.java | 41 +++++++++++++++++-- .../test/java/com/risingwave/TestUtils.java | 12 ++++++ src/frontend/src/binder/declare_cursor.rs | 9 +++- src/frontend/src/binder/statement.rs | 23 ++++++++--- src/frontend/src/handler/declare_cursor.rs | 32 +++++++-------- src/frontend/src/handler/extended_handle.rs | 10 ++--- src/frontend/src/handler/privilege.rs | 1 + src/frontend/src/handler/query.rs | 39 +++++++++++------- src/frontend/src/planner/statement.rs | 1 + 9 files changed, 121 insertions(+), 47 deletions(-) diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestCursor.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestCursor.java index c583f05856524..6b8a3d6608b57 100644 --- a/integration_tests/client-library/java/src/test/java/com/risingwave/TestCursor.java +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestCursor.java @@ -9,7 +9,7 @@ public class TestCursor { public static void createTable() throws SQLException { - try (Connection connection = TestUtils.establishConnection()) { + try (Connection connection = TestUtils.establishExtendedConnection()) { String createTableSQL = "CREATE TABLE test_table (" + "id INT PRIMARY KEY, " + "trading_date DATE, " + @@ -25,7 +25,7 @@ public static void createTable() throws SQLException { public static void dropTable() throws SQLException { String dropSourceQuery = "DROP TABLE test_table;"; - try (Connection connection = TestUtils.establishConnection()) { + try (Connection connection = TestUtils.establishExtendedConnection()) { Statement statement = connection.createStatement(); statement.executeUpdate(dropSourceQuery); System.out.println("Table test_table dropped successfully."); @@ -34,7 +34,7 @@ public static void dropTable() throws SQLException { public static void readWithExtendedCursor() throws SQLException { - try (Connection connection = TestUtils.establishConnection()) { + try (Connection connection = TestUtils.establishExtendedConnection()) { connection.setAutoCommit(false); Statement statement = connection.createStatement(); statement.execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ"); @@ -60,10 +60,45 @@ public static void readWithExtendedCursor() throws SQLException { } } + public static void readWithExtendedSubscriptionCursor() throws SQLException { + try (Connection connection = TestUtils.establishExtendedConnection()) { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + statement.execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + statement.execute("CREATE SUBSCRIPTION sub FROM public.test_table WITH(retention = '1d')"); + + String declareCursorSql = "DECLARE c1 SUBSCRIPTION CURSOR FOR sub FULL"; + PreparedStatement pstmt = connection.prepareStatement(declareCursorSql); + pstmt.execute(); + + statement.execute("FETCH 100 FROM c1"); + ResultSet resultSet = statement.getResultSet(); + + while (resultSet != null && resultSet.next()) { + Assertions.assertEquals(resultSet.getInt("id"), 1); + Assertions.assertEquals(resultSet.getString("trading_date"), "2024-07-10"); + Assertions.assertEquals(resultSet.getInt("volume"), 23); + } + + statement.execute("CLOSE c1"); + statement.execute("COMMIT"); + statement.execute("DROP SUBSCRIPTION sub"); + + System.out.println("Data in table read with cursor successfully."); + } + } + @Test public void testCursor() throws SQLException { createTable(); readWithExtendedCursor(); dropTable(); } + + @Test + public void testSubscriptionCursor() throws SQLException { + createTable(); + readWithExtendedSubscriptionCursor(); + dropTable(); + } } diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java index 245c2f36a06d2..74c2ef5b3a436 100644 --- a/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java @@ -18,4 +18,16 @@ public static Connection establishConnection() throws SQLException { return DriverManager.getConnection(url, props); } + + public static Connection establishExtendedConnection() throws SQLException { + final String url = "jdbc:postgresql://risingwave-standalone:4566/dev"; + final String user = "root"; + final String password = ""; + + Properties props = new Properties(); + props.setProperty("user", user); + props.setProperty("password", password); + + return DriverManager.getConnection(url, props); + } } diff --git a/src/frontend/src/binder/declare_cursor.rs b/src/frontend/src/binder/declare_cursor.rs index feec8333f994a..58df2f920f967 100644 --- a/src/frontend/src/binder/declare_cursor.rs +++ b/src/frontend/src/binder/declare_cursor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_sqlparser::ast::Ident; +use risingwave_sqlparser::ast::{Ident, ObjectName, Since}; use super::statement::RewriteExprsRecursive; use crate::binder::BoundQuery; @@ -30,3 +30,10 @@ impl RewriteExprsRecursive for BoundDeclareCursor { self.query.rewrite_exprs_recursive(rewriter); } } + +#[derive(Debug, Clone)] +pub struct BoundDeclareSubscriptionCursor { + pub cursor_name: Ident, + pub subscription_name: ObjectName, + pub rw_timestamp: Since, +} diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index a65d382069e80..7047a526646c4 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -16,7 +16,7 @@ use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Field; use risingwave_sqlparser::ast::{DeclareCursor, Statement}; -use super::declare_cursor::BoundDeclareCursor; +use super::declare_cursor::{BoundDeclareCursor, BoundDeclareSubscriptionCursor}; use super::delete::BoundDelete; use super::fetch_cursor::BoundFetchCursor; use super::update::BoundUpdate; @@ -32,6 +32,7 @@ pub enum BoundStatement { Update(Box), Query(Box), DeclareCursor(Box), + DeclareSubscriptionCursor(Box), FetchCursor(Box), CreateView(Box), } @@ -53,6 +54,7 @@ impl BoundStatement { .map_or(vec![], |s| s.fields().into()), BoundStatement::Query(q) => q.schema().fields().into(), BoundStatement::DeclareCursor(_) => vec![], + BoundStatement::DeclareSubscriptionCursor(_) => vec![], BoundStatement::FetchCursor(f) => f .returning_schema .as_ref() @@ -95,8 +97,8 @@ impl Binder { Statement::Query(q) => Ok(BoundStatement::Query(self.bind_query(*q)?.into())), - Statement::DeclareCursor { stmt } => { - if let DeclareCursor::Query(body) = stmt.declare_cursor { + Statement::DeclareCursor { stmt } => match stmt.declare_cursor { + DeclareCursor::Query(body) => { let query = self.bind_query(*body)?; Ok(BoundStatement::DeclareCursor( BoundDeclareCursor { @@ -105,10 +107,18 @@ impl Binder { } .into(), )) - } else { - bail_not_implemented!("unsupported statement {:?}", stmt) } - } + DeclareCursor::Subscription(subscription_name, rw_timestamp) => { + Ok(BoundStatement::DeclareSubscriptionCursor( + BoundDeclareSubscriptionCursor { + cursor_name: stmt.cursor_name, + subscription_name, + rw_timestamp, + } + .into(), + )) + } + }, // Note(eric): Can I just bind CreateView to Query?? Statement::CreateView { @@ -153,6 +163,7 @@ impl RewriteExprsRecursive for BoundStatement { BoundStatement::Query(inner) => inner.rewrite_exprs_recursive(rewriter), BoundStatement::DeclareCursor(inner) => inner.rewrite_exprs_recursive(rewriter), BoundStatement::FetchCursor(_) => {} + BoundStatement::DeclareSubscriptionCursor(_) => {} BoundStatement::CreateView(inner) => inner.rewrite_exprs_recursive(rewriter), } } diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index be431c4a43052..e219cc183aa71 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -35,16 +35,16 @@ use crate::session::SessionImpl; use crate::{Binder, OptimizerContext}; pub async fn handle_declare_cursor( - handle_args: HandlerArgs, + handler_args: HandlerArgs, stmt: DeclareCursorStatement, ) -> Result { match stmt.declare_cursor { risingwave_sqlparser::ast::DeclareCursor::Query(query) => { - handle_declare_query_cursor(handle_args, stmt.cursor_name, query).await + handle_declare_query_cursor(handler_args, stmt.cursor_name, query).await } risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => { handle_declare_subscription_cursor( - handle_args, + handler_args, sub_name, stmt.cursor_name, rw_timestamp, @@ -53,13 +53,13 @@ pub async fn handle_declare_cursor( } } } -async fn handle_declare_subscription_cursor( - handle_args: HandlerArgs, +pub async fn handle_declare_subscription_cursor( + handler_args: HandlerArgs, sub_name: ObjectName, cursor_name: Ident, rw_timestamp: Since, ) -> Result { - let session = handle_args.session.clone(); + let session = handler_args.session.clone(); let subscription = { let db_name = &session.database(); let (sub_schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, sub_name)?; @@ -89,7 +89,7 @@ async fn handle_declare_subscription_cursor( start_rw_timestamp, subscription.dependent_table_id, subscription, - &handle_args, + &handler_args, ) .await { @@ -122,13 +122,13 @@ fn check_cursor_unix_millis(unix_millis: u64, retention_seconds: u64) -> Result< } async fn handle_declare_query_cursor( - handle_args: HandlerArgs, + handler_args: HandlerArgs, cursor_name: Ident, query: Box, ) -> Result { let (chunk_stream, fields) = - create_stream_for_cursor_stmt(handle_args.clone(), Statement::Query(query)).await?; - handle_args + create_stream_for_cursor_stmt(handler_args.clone(), Statement::Query(query)).await?; + handler_args .session .get_cursor_manager() .add_query_cursor(cursor_name.real_value(), chunk_stream, fields) @@ -137,15 +137,15 @@ async fn handle_declare_query_cursor( } pub async fn handle_bound_declare_query_cursor( - handle_args: HandlerArgs, + handler_args: HandlerArgs, cursor_name: Ident, plan_fragmenter_result: BatchPlanFragmenterResult, ) -> Result { - let session = handle_args.session.clone(); + let session = handler_args.session.clone(); let (chunk_stream, fields) = create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?; - handle_args + handler_args .session .get_cursor_manager() .add_query_cursor(cursor_name.real_value(), chunk_stream, fields) @@ -154,12 +154,12 @@ pub async fn handle_bound_declare_query_cursor( } pub async fn create_stream_for_cursor_stmt( - handle_args: HandlerArgs, + handler_args: HandlerArgs, stmt: Statement, ) -> Result<(CursorDataChunkStream, Vec)> { - let session = handle_args.session.clone(); + let session = handler_args.session.clone(); let plan_fragmenter_result = { - let context = OptimizerContext::from_handler_args(handle_args); + let context = OptimizerContext::from_handler_args(handler_args); let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; gen_batch_plan_fragmenter(&session, plan_result)? }; diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index f4184e159c16c..b797aaac36468 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -20,7 +20,7 @@ use bytes::Bytes; use pgwire::types::Format; use risingwave_common::bail_not_implemented; use risingwave_common::types::DataType; -use risingwave_sqlparser::ast::{CreateSink, DeclareCursor, Query, Statement}; +use risingwave_sqlparser::ast::{CreateSink, Query, Statement}; use super::query::BoundResult; use super::{fetch_cursor, handle, query, HandlerArgs, RwPgResponse}; @@ -112,12 +112,8 @@ pub async fn handle_parse( Statement::FetchCursor { .. } => { fetch_cursor::handle_parse(handler_args, statement, specific_param_types).await } - Statement::DeclareCursor { stmt } => { - if let DeclareCursor::Query(_) = stmt.declare_cursor { - query::handle_parse(handler_args, statement, specific_param_types) - } else { - bail_not_implemented!("DECLARE SUBSCRIPTION CURSOR with parameters"); - } + Statement::DeclareCursor { .. } => { + query::handle_parse(handler_args, statement, specific_param_types) } Statement::CreateView { query, diff --git a/src/frontend/src/handler/privilege.rs b/src/frontend/src/handler/privilege.rs index 06b68ed2ccc3c..ecd5524980f30 100644 --- a/src/frontend/src/handler/privilege.rs +++ b/src/frontend/src/handler/privilege.rs @@ -212,6 +212,7 @@ impl SessionImpl { BoundStatement::DeclareCursor(ref declare_cursor) => { resolve_query_privileges(&mut items, &mut check_databases, &declare_cursor.query); } + BoundStatement::DeclareSubscriptionCursor(_) => unimplemented!(), BoundStatement::FetchCursor(_) => unimplemented!(), BoundStatement::CreateView(ref create_view) => { resolve_query_privileges(&mut items, &mut check_databases, &create_view.query); diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index f0c23cb409b1d..ff8c4a372eec8 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -156,20 +156,31 @@ pub async fn handle_execute( ) .await } - Statement::DeclareCursor { stmt } => { - let session = handler_args.session.clone(); - let plan_fragmenter_result = { - let context = OptimizerContext::from_handler_args(handler_args.clone()); - let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?; - gen_batch_plan_fragmenter(&session, plan_result)? - }; - declare_cursor::handle_bound_declare_query_cursor( - handler_args, - stmt.cursor_name, - plan_fragmenter_result, - ) - .await - } + Statement::DeclareCursor { stmt } => match stmt.declare_cursor { + risingwave_sqlparser::ast::DeclareCursor::Query(_) => { + let session = handler_args.session.clone(); + let plan_fragmenter_result = { + let context = OptimizerContext::from_handler_args(handler_args.clone()); + let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?; + gen_batch_plan_fragmenter(&session, plan_result)? + }; + declare_cursor::handle_bound_declare_query_cursor( + handler_args, + stmt.cursor_name, + plan_fragmenter_result, + ) + .await + } + risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => { + declare_cursor::handle_declare_subscription_cursor( + handler_args, + sub_name, + stmt.cursor_name, + rw_timestamp, + ) + .await + } + }, _ => unreachable!(), } } diff --git a/src/frontend/src/planner/statement.rs b/src/frontend/src/planner/statement.rs index 78a55a34cec8c..df1031803263b 100644 --- a/src/frontend/src/planner/statement.rs +++ b/src/frontend/src/planner/statement.rs @@ -25,6 +25,7 @@ impl Planner { BoundStatement::Update(u) => self.plan_update(*u), BoundStatement::Query(q) => self.plan_query(*q), BoundStatement::DeclareCursor(d) => self.plan_query(*d.query), + BoundStatement::DeclareSubscriptionCursor(_) => unimplemented!(), BoundStatement::FetchCursor(_) => unimplemented!(), BoundStatement::CreateView(c) => self.plan_query(*c.query), } From 7df444c14f5b78effff4e9634a9488befdcd6467 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 19 Feb 2025 14:40:00 +0800 Subject: [PATCH 6/6] chore(test): upgrade sqllogictest to 0.27.1 (#20516) Signed-off-by: Richard Chien --- Cargo.lock | 5 ++- Makefile.toml | 2 +- ci/Dockerfile | 2 +- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 14 +++---- ci/scripts/e2e-test-parallel.sh | 38 +++++++++---------- ci/scripts/e2e-test-serial.sh | 11 +----- e2e_test/commands/internal_table.mjs | 17 ++++++--- .../queryable_internal_state/group_agg.slt | 11 ++++-- src/tests/simulation/Cargo.toml | 2 +- src/tests/simulation/src/client.rs | 2 + 11 files changed, 55 insertions(+), 51 deletions(-) rename e2e_test/{ => streaming}/queryable_internal_state/group_agg.slt (68%) diff --git a/Cargo.lock b/Cargo.lock index 7d57d6a9125d3..e564d37474703 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13554,9 +13554,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.26.3" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dba5b678841955201299013ad50dc34e4b75acf89d7cfbb0769a59910d9081b" +checksum = "07a06aea5e52b0a63b9d8328b46ea2740cdab4cac13def8ef4f2e5288610f9ed" dependencies = [ "async-trait", "educe", @@ -13568,6 +13568,7 @@ dependencies = [ "libtest-mimic", "md-5", "owo-colors", + "rand 0.8.5", "regex", "similar", "subst", diff --git a/Makefile.toml b/Makefile.toml index d088108964225..f6eb3446b8321 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1359,7 +1359,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - Test - SQLLogicTest" -install_crate = { min_version = "0.26.3", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { min_version = "0.27.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-and-load-risedev-env-file"] diff --git a/ci/Dockerfile b/ci/Dockerfile index 35dc259a26106..5fc89f36e1d10 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -70,7 +70,7 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --locked cargo-llvm-cov cargo-nextest cargo-sort cargo-cache cargo-machete \ cargo-make@0.37.9 \ - sqllogictest-bin@0.26.3 \ + sqllogictest-bin@0.27.1 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 096b8cbaa3779..39d9fa4454312 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20250115 +export BUILD_ENV_VERSION=v20250218 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index ff77442e9712c..ef1ae97355895 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -90,7 +90,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 depends_on: - mysql - mysql-meta @@ -106,7 +106,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 depends_on: - mysql - mysql-meta @@ -129,13 +129,13 @@ services: - ..:/risingwave rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 volumes: - ..:/risingwave # Standard environment for CI, including MySQL and Postgres for metadata. ci-standard-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 depends_on: - mysql-meta - db @@ -143,14 +143,14 @@ services: - ..:/risingwave iceberg-engine-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 depends_on: - db volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -161,7 +161,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250115 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20250218 depends_on: db: condition: service_healthy diff --git a/ci/scripts/e2e-test-parallel.sh b/ci/scripts/e2e-test-parallel.sh index effe8e364dd24..d88064a118747 100755 --- a/ci/scripts/e2e-test-parallel.sh +++ b/ci/scripts/e2e-test-parallel.sh @@ -26,34 +26,34 @@ download_and_prepare_rw "$profile" common echo "--- Download artifacts" download-and-decompress-artifact e2e_test_generated ./ +start_cluster() { + echo "--- Start cluster" + risedev ci-start ci-3streaming-2serving-3fe +} + kill_cluster() { - echo "--- Kill cluster" - risedev ci-kill + echo "--- Kill cluster" + risedev ci-kill } host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567) -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_storage::hummock::compactor::compactor_runner=warn" - -echo "--- e2e, ci-3streaming-2serving-3fe, streaming" -RUST_LOG=$RUST_LOG \ -risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" - +echo "--- e2e, parallel, streaming" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_storage::hummock::compactor::compactor_runner=warn" \ +start_cluster +risedev slt "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" kill_cluster -echo "--- e2e, ci-3streaming-2serving-3fe, batch" -RUST_LOG=$RUST_LOG \ -risedev ci-start ci-3streaming-2serving-3fe +echo "--- e2e, parallel, batch" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_storage::hummock::compactor::compactor_runner=warn" \ +start_cluster # Exclude files that contain ALTER SYSTEM commands find ./e2e_test/ddl -name "*.slt" -type f -exec grep -L "ALTER SYSTEM" {} \; | xargs -r sqllogictest "${host_args[@]}" -d dev --junit "parallel-batch-ddl-${profile}" --label "parallel" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" - +risedev slt "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" kill_cluster -echo "--- e2e, ci-3streaming-2serving-3fe, generated" -RUST_LOG=$RUST_LOG \ -risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" - +echo "--- e2e, parallel, generated" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_storage::hummock::compactor::compactor_runner=warn" \ +start_cluster +risedev slt "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" kill_cluster diff --git a/ci/scripts/e2e-test-serial.sh b/ci/scripts/e2e-test-serial.sh index 741a0fe7a2883..2ee27d1ec91de 100755 --- a/ci/scripts/e2e-test-serial.sh +++ b/ci/scripts/e2e-test-serial.sh @@ -84,8 +84,7 @@ echo "--- e2e, $mode, streaming" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_stream::common::table::state_table=warn" \ cluster_start # Please make sure the regression is expected before increasing the timeout. -sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}" -risedev slt -p 4566 -d dev './e2e_test/queryable_internal_state/**/*.slt' --junit "queryable-internal-state-${profile}" +risedev slt -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}" sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt' echo "--- Kill cluster" @@ -145,14 +144,6 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/python_udf.slt' echo "--- Kill cluster" cluster_stop -echo "--- e2e, $mode, generated" -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -cluster_start -sqllogictest -p 4566 -d dev './e2e_test/generated/**/*.slt' --junit "generated-${profile}" - -echo "--- Kill cluster" -cluster_stop - # only run if mode is not single-node or standalone if [[ "$mode" != "single-node" && "$mode" != "standalone" ]]; then echo "--- e2e, ci-3cn-1fe-with-recovery, error ui" diff --git a/e2e_test/commands/internal_table.mjs b/e2e_test/commands/internal_table.mjs index 1deb727e7c3fe..73edb8d4bddf8 100755 --- a/e2e_test/commands/internal_table.mjs +++ b/e2e_test/commands/internal_table.mjs @@ -4,19 +4,23 @@ // https://google.github.io/zx/ const { + db: db_name, name: job_name, type: table_type, count: count, } = minimist(process.argv.slice(3), { - string: ["name", "type"], + string: ["db", "name", "type"], boolean: ["count"], + default: { + "db": "dev", + } }); // Return an array of CSV string -async function psql(query) { +async function psql(db_name, query) { return ( await $` -psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev \ +psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d ${db_name} \ --csv --tuples-only --quiet -c ${query} ` ) @@ -28,9 +32,10 @@ psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root // If `table_type` is null, return all internal tables for the job. // If `job_name` is null, return all jobs' internal tables. -async function select_internal_table(job_name, table_type) { +async function select_internal_table(db_name, job_name, table_type) { // Note: if we have `t1`, and `t1_balabala`, the latter one will also be matched 😄. const internal_tables = await psql( + db_name, `select name from rw_internal_tables where name like '__internal_${job_name}_%_${table_type}_%'` ); if (internal_tables.length == 0) { @@ -42,7 +47,7 @@ async function select_internal_table(job_name, table_type) { const res = new Map( await Promise.all( internal_tables.map(async (t) => { - let rows = await psql(`select * from ${t}`); + let rows = await psql(db_name, `select * from ${t}`); return [t, rows]; }) ) @@ -50,7 +55,7 @@ async function select_internal_table(job_name, table_type) { return res; } -const tables = await select_internal_table(job_name, table_type); +const tables = await select_internal_table(db_name, job_name, table_type); for (const [table_name, rows] of tables) { if (tables.size > 1) { console.log(`Table: ${table_name}`); diff --git a/e2e_test/queryable_internal_state/group_agg.slt b/e2e_test/streaming/queryable_internal_state/group_agg.slt similarity index 68% rename from e2e_test/queryable_internal_state/group_agg.slt rename to e2e_test/streaming/queryable_internal_state/group_agg.slt index 6dd1ed2b338e0..c75b2c298eb3d 100644 --- a/e2e_test/queryable_internal_state/group_agg.slt +++ b/e2e_test/streaming/queryable_internal_state/group_agg.slt @@ -1,5 +1,7 @@ # See https://github.com/risingwavelabs/risingwave/pull/20435 for the bug. +control substitution on + statement ok SET RW_IMPLICIT_FLUSH TO true; @@ -17,8 +19,9 @@ select * from mv; ---- 1 7 +skipif madsim system ok -internal_table.mjs --name mv --type hashaggstate --count +internal_table.mjs --db $__DATABASE__ --name mv --type hashaggstate --count ---- count: 1 @@ -32,8 +35,9 @@ select * from mv order by foo; 1 7 2 7 +skipif madsim system ok -internal_table.mjs --name mv --type hashaggstate --count +internal_table.mjs --db $__DATABASE__ --name mv --type hashaggstate --count ---- count: 2 @@ -46,8 +50,9 @@ select * from mv; ---- 2 7 +skipif madsim system ok -internal_table.mjs --name mv --type hashaggstate --count +internal_table.mjs --db $__DATABASE__ --name mv --type hashaggstate --count ---- count: 1 diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index cffcc92ac87ef..9b0d52c65ee29 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -45,7 +45,7 @@ risingwave_sqlsmith = { workspace = true } serde = "1.0.188" serde_derive = "1.0.188" serde_json = "1.0.107" -sqllogictest = "0.26.3" +sqllogictest = "0.27.1" tempfile = "3" tikv-jemallocator = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio" } diff --git a/src/tests/simulation/src/client.rs b/src/tests/simulation/src/client.rs index 1329b41857e90..d298e6685baee 100644 --- a/src/tests/simulation/src/client.rs +++ b/src/tests/simulation/src/client.rs @@ -212,6 +212,8 @@ impl sqllogictest::AsyncDB for RisingWave { } } + async fn shutdown(&mut self) {} + fn engine_name(&self) -> &str { "risingwave" }