From 08a8193de36cc7a7481cab0a5a8efe9bed303920 Mon Sep 17 00:00:00 2001 From: Yuhui Date: Wed, 12 Jun 2024 14:30:16 +0800 Subject: [PATCH] [#3800]fix(trino-connector): EXPLAIN command can not show query optimize details (#3801) ### What changes were proposed in this pull request? The EXPLAIN command cannot display query optimization details when using the Gravitino Trino connector ### Why are the changes needed? Fix: #3800 ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? IT --- .../{bugs => }/00004_query_pushdown.sql | 0 .../jdbc-mysql/00004_query_pushdown.txt | 51 ++++++ .../jdbc-mysql/bugs/00004_query_pushdown.txt | 51 ------ .../{bugs => }/00003_join_pushdown.sql | 0 .../{bugs => }/00003_join_pushdown.txt | 0 .../{bugs => }/00004_query_pushdown.sql | 0 .../jdbc-postgresql/00004_query_pushdown.txt | 51 ++++++ .../bugs/00004_query_pushdown.txt | 51 ------ .../connector/GravitinoColumnHandle.java | 2 +- .../connector/GravitinoConnectorFactory.java | 2 +- .../trino/connector/GravitinoSplit.java | 7 + .../trino/connector/GravitinoTableHandle.java | 2 +- .../connector/GravitinoTransactionHandle.java | 7 + .../catalog/CatalogConnectorManager.java | 171 ++---------------- .../AlterCatalogStoredProcedure.java | 72 +++++++- .../CreateCatalogStoredProcedure.java | 51 +++++- .../DropCatalogStoredProcedure.java | 48 ++++- 17 files changed, 305 insertions(+), 261 deletions(-) rename integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/{bugs => }/00004_query_pushdown.sql (100%) create mode 100644 integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00004_query_pushdown.txt delete mode 100644 integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/bugs/00004_query_pushdown.txt rename integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/{bugs => }/00003_join_pushdown.sql (100%) rename integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/{bugs => }/00003_join_pushdown.txt (100%) rename integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/{bugs => }/00004_query_pushdown.sql (100%) create mode 100644 integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00004_query_pushdown.txt delete mode 100644 integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00004_query_pushdown.txt diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/bugs/00004_query_pushdown.sql b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00004_query_pushdown.sql similarity index 100% rename from integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/bugs/00004_query_pushdown.sql rename to integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00004_query_pushdown.sql diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00004_query_pushdown.txt b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00004_query_pushdown.txt new file mode 100644 index 00000000000..bef98d37b82 --- /dev/null +++ b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/00004_query_pushdown.txt @@ -0,0 +1,51 @@ +CREATE SCHEMA + +USE + +CREATE TABLE + +CREATE TABLE + +INSERT: 1500 rows + +INSERT: 15000 rows + +"Trino version: % +% + └─ TableScan[table = gt_mysql:gt_db1.customer->gt_db1.customer gt_db1.customer limit=10 columns=[custkey:bigint:BIGINT]] + Layout: [custkey:bigint] +% +" + +"Trino version: % +% + └─ ScanFilter[table = gt_mysql:gt_db1.customer->gt_db1.customer gt_db1.customer, filterPredicate = ""$like""(""phone"", ""$literal$""(from_base64('DgAAAFZBUklBQkxFX1dJRFRIAQAAAAEAAAALAAAAAAsAAAAGAAAAJTIzNDIlAA==')))] + Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:decimal(12,2), mktsegment:varchar(10), comment:varchar(117)] +% +" + +"Trino version: % +% + └─ TableScan[table = gt_mysql:gt_db1.orders->Query[SELECT sum(`totalprice`) AS `_pfgnrtd_0` FROM `gt_db1`.`orders`] columns=[_pfgnrtd_0:decimal(38,2):decimal]] + Layout: [_pfgnrtd:decimal(38,2)] +% +" + +"Trino version: % +% + └─ TableScan[table = gt_mysql:gt_db1.orders->Query[SELECT `orderdate`, sum(`totalprice`) AS `_pfgnrtd_0` FROM `gt_db1`.`orders` GROUP BY `orderdate`] sortOrder=[orderdate:date:DATE ASC NULLS LAST] limit=10 columns=[orderdate:date:DATE, _pfgnrtd_0:decimal(38,2):decimal]] + Layout: [orderdate:date, _pfgnrtd:decimal(38,2)] +% +" + +"Trino version: % +% + └─ TableScan[table = gt_mysql:gt_db1.%->Query[SELECT % INNER JOIN %] limit=10 columns=%]] +% +" + +DROP TABLE + +DROP TABLE + +DROP SCHEMA diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/bugs/00004_query_pushdown.txt b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/bugs/00004_query_pushdown.txt deleted file mode 100644 index 5e8e51a098d..00000000000 --- a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-mysql/bugs/00004_query_pushdown.txt +++ /dev/null @@ -1,51 +0,0 @@ -CREATE SCHEMA - -USE - -CREATE TABLE - -CREATE TABLE - -INSERT: 1500 rows - -INSERT: 15000 rows - -"Trino version: % -% - └─ TableScan[table = gt_mysql:gt_db1.customer gt_db1.customer limit=10 columns=[custkey:bigint:BIGINT]] - Layout: [custkey:bigint] -% -" - -"Trino version: % -% - └─ ScanFilter[table = gt_mysql:gt_db1:customer, filterPredicate = ""$like""(""phone"", ""$literal$""(from_base64('DgAAAFZBUklBQkxFX1dJRFRIAQAAAAEAAAALAAAAAAsAAAAGAAAAJTIzNDIlAA==')))] - Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:decimal(12,2), mktsegment:varchar(10), comment:varchar(117)] -% -" - -"Trino version: % -% - └─ TableScan[table = gt_mysql:Query[SELECT sum(`totalprice`) AS `_pfgnrtd_0` FROM `gt_db1`.`orders`] columns=[_pfgnrtd_0:decimal(38,2):decimal]] - Layout: [_pfgnrtd:decimal(38,2)] -% -" - -"Trino version: % -% - └─ TableScan[table = gt_mysql:Query[SELECT `orderdate`, sum(`totalprice`) AS `_pfgnrtd_0` FROM `gt_db1`.`orders` GROUP BY `orderdate`] sortOrder=[orderdate:date:DATE ASC NULLS LAST] limit=10 columns=[orderdate:date:DATE, _pfgnrtd_0:decimal(38,2):decimal]] - Layout: [orderdate:date, _pfgnrtd:decimal(38,2)] -% -" - -"Trino version: % -% - └─ TableScan[table = gt_mysql:Query[SELECT % INNER JOIN %] limit=10 columns=%]] -% -" - -DROP TABLE - -DROP TABLE - -DROP SCHEMA diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00003_join_pushdown.sql b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00003_join_pushdown.sql similarity index 100% rename from integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00003_join_pushdown.sql rename to integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00003_join_pushdown.sql diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00003_join_pushdown.txt b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00003_join_pushdown.txt similarity index 100% rename from integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00003_join_pushdown.txt rename to integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00003_join_pushdown.txt diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00004_query_pushdown.sql b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00004_query_pushdown.sql similarity index 100% rename from integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00004_query_pushdown.sql rename to integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00004_query_pushdown.sql diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00004_query_pushdown.txt b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00004_query_pushdown.txt new file mode 100644 index 00000000000..cf7cbeab737 --- /dev/null +++ b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/00004_query_pushdown.txt @@ -0,0 +1,51 @@ +CREATE SCHEMA + +USE + +CREATE TABLE + +CREATE TABLE + +INSERT: 1500 rows + +INSERT: 15000 rows + +"Trino version: % +% + └─ TableScan[table = gt_postgresql:gt_db1.customer->gt_db1.customer gt_db1.customer limit=10 columns=[custkey:bigint:int8]] + Layout: [custkey:bigint] +% +" + +"Trino version: % +% + └─ TableScan[table = gt_postgresql:gt_db1.customer->gt_db1.customer gt_db1.customer constraints=[ParameterizedExpression[expression=(""phone"") LIKE (?), parameters=[QueryParameter{jdbcType=Optional.empty, type=varchar(6), value=Optional[Slice{base=[B@%, baseOffset=0, length=6}]}]]] limit=10] + Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:decimal(12,2), mktsegment:varchar(10), comment:varchar(117)] +% +" + +"Trino version: % +% + └─ TableScan[table = gt_postgresql:gt_db1.orders->Query[SELECT sum(""totalprice"") AS ""_pfgnrtd_0"" FROM ""gt_db1"".""orders""] columns=[_pfgnrtd_0:decimal(38,2):decimal]] + Layout: [_pfgnrtd:decimal(38,2)] +% +" + +"Trino version: % +% + └─ TableScan[table = gt_postgresql:gt_db1.%->Query[SELECT ""orderdate"", sum(""totalprice"") AS ""_pfgnrtd_0"" FROM ""gt_db1"".""orders"" GROUP BY ""orderdate""] sortOrder=[orderdate:date:date ASC NULLS LAST] limit=10 columns=[orderdate:date:date, _pfgnrtd_0:decimal(38,2):decimal]] + Layout: [orderdate:date, _pfgnrtd:decimal(38,2)] +% +" + +"Trino version: % +% + TableScan[table = gt_postgresql:gt_db1.orders->Query[SELECT % INNER JOIN %] limit=10 columns=%] +% +" + +DROP TABLE + +DROP TABLE + +DROP SCHEMA \ No newline at end of file diff --git a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00004_query_pushdown.txt b/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00004_query_pushdown.txt deleted file mode 100644 index a823283fe1e..00000000000 --- a/integration-test/src/test/resources/trino-ci-testset/testsets/jdbc-postgresql/bugs/00004_query_pushdown.txt +++ /dev/null @@ -1,51 +0,0 @@ -CREATE SCHEMA - -USE - -CREATE TABLE - -CREATE TABLE - -INSERT: 1500 rows - -INSERT: 15000 rows - -"Trino version: % -% - └─ TableScan[table = gt_postgresql:gt_db1.customer gt_db1.customer limit=10 columns=[custkey:bigint:int8]] - Layout: [custkey:bigint] -% -" - -"Trino version: % -% - └─ TableScan[table = gt_postgresql:gt_db1.customer gt_db1.customer constraints=[ParameterizedExpression[expression=(""phone"") LIKE (?), parameters=[QueryParameter{jdbcType=Optional.empty, type=varchar(6), value=Optional[Slice{base=[B@%, baseOffset=0, length=6}]}]]] limit=10] - Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:decimal(12,2), mktsegment:varchar(10), comment:varchar(117)] -% -" - -"Trino version: % -% - └─ TableScan[table = gt_postgresql:Query[SELECT sum(""totalprice"") AS ""_pfgnrtd_0"" FROM ""gt_db1"".""orders""] columns=[_pfgnrtd_0:decimal(38,2):decimal]] - Layout: [_pfgnrtd:decimal(38,2)] -% -" - -"Trino version: % -% - └─ TableScan[table = gt_postgresql:Query[SELECT ""orderdate"", sum(""totalprice"") AS ""_pfgnrtd_0"" FROM ""gt_db1"".""orders"" GROUP BY ""orderdate""] sortOrder=[orderdate:date:date ASC NULLS LAST] limit=10 columns=[orderdate:date:date, _pfgnrtd_0:decimal(38,2):decimal]] - Layout: [orderdate:date, _pfgnrtd:decimal(38,2)] -% -" - -"Trino version: % -% - TableScan[table = gt_postgresql:Query[SELECT % INNER JOIN %] limit=10 columns=%] -% -" - -DROP TABLE - -DROP TABLE - -DROP SCHEMA \ No newline at end of file diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoColumnHandle.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoColumnHandle.java index 1da24118723..7a8d0883993 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoColumnHandle.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoColumnHandle.java @@ -71,6 +71,6 @@ public boolean equals(Object obj) { @Override public String toString() { - return columnName; + return columnName + "->" + getInternalHandle().toString(); } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java index a1605b8e8cf..7407181c12d 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java @@ -30,7 +30,7 @@ public class GravitinoConnectorFactory implements ConnectorFactory { private static final Logger LOG = LoggerFactory.getLogger(GravitinoConnectorFactory.class); - private static final String DEFAULT_CONNECTOR_NAME = "gravitino"; + public static final String DEFAULT_CONNECTOR_NAME = "gravitino"; @SuppressWarnings("UnusedVariable") private GravitinoSystemTableFactory gravitinoSystemTableFactory; diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplit.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplit.java index 80b6fc2dfc9..29a43d00007 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplit.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplit.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.trino.connector; +import static com.datastrato.gravitino.trino.connector.GravitinoConnectorFactory.DEFAULT_CONNECTOR_NAME; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.HostAddress; @@ -63,4 +65,9 @@ public SplitWeight getSplitWeight() { public long getRetainedSizeInBytes() { return handleWrapper.getHandle().getRetainedSizeInBytes(); } + + @Override + public String toString() { + return DEFAULT_CONNECTOR_NAME + "->" + getInternalHandle().toString(); + } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTableHandle.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTableHandle.java index b60282f893a..9efe29fc3fe 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTableHandle.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTableHandle.java @@ -95,6 +95,6 @@ public boolean equals(Object obj) { @Override public String toString() { - return schemaName + ":" + tableName; + return String.format("%s.%s->%s", schemaName, tableName, getInternalHandle().toString()); } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTransactionHandle.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTransactionHandle.java index 06fd46d3d42..f92df91d441 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTransactionHandle.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoTransactionHandle.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.trino.connector; +import static com.datastrato.gravitino.trino.connector.GravitinoConnectorFactory.DEFAULT_CONNECTOR_NAME; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -37,4 +39,9 @@ public String getHandleString() { public ConnectorTransactionHandle getInternalHandle() { return handleWrapper.getHandle(); } + + @Override + public String toString() { + return DEFAULT_CONNECTOR_NAME + "->" + getInternalHandle().toString(); + } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java index fb76cfdaeaa..eda24754ed4 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java @@ -4,21 +4,14 @@ */ package com.datastrato.gravitino.trino.connector.catalog; -import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CATALOG_ALREADY_EXISTS; -import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CATALOG_NOT_EXISTS; import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CREATE_INTERNAL_CONNECTOR_ERROR; import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS; import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG; import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_OPERATION_FAILED; -import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION; import com.datastrato.gravitino.Catalog; -import com.datastrato.gravitino.CatalogChange; -import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.client.GravitinoAdminClient; import com.datastrato.gravitino.client.GravitinoMetalake; -import com.datastrato.gravitino.exceptions.CatalogAlreadyExistsException; -import com.datastrato.gravitino.exceptions.NoSuchCatalogException; import com.datastrato.gravitino.exceptions.NoSuchMetalakeException; import com.datastrato.gravitino.trino.connector.GravitinoConfig; import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog; @@ -27,7 +20,6 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -241,6 +233,10 @@ public CatalogConnectorContext getCatalogConnector(String catalogName) { return catalogConnectors.get(catalogName); } + public boolean catalogConnectorExist(String catalogName) { + return catalogConnectors.containsKey(catalogName); + } + public List getCatalogs() { return catalogConnectors.values().stream().map(CatalogConnectorContext::getCatalog).toList(); } @@ -258,151 +254,6 @@ public String getTrinoCatalogName(GravitinoCatalog catalog) { return getTrinoCatalogName(catalog.getMetalake(), catalog.getName()); } - public void createCatalog( - String metalakeName, - String catalogName, - String provider, - Map properties, - boolean ignoreExist) { - if (catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) { - if (!ignoreExist) { - throw new TrinoException( - GRAVITINO_CATALOG_ALREADY_EXISTS, - String.format( - "Catalog %s already exists.", NameIdentifier.of(metalakeName, catalogName))); - } - return; - } - - try { - GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName); - metalake.createCatalog( - catalogName, Catalog.Type.RELATIONAL, provider, "Trino created", properties); - - LOG.info("Create catalog {} in metalake {} successfully.", catalogName, metalake); - - Future future = executorService.submit(this::loadMetalake); - future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); - - if (!catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) { - throw new TrinoException( - GRAVITINO_OPERATION_FAILED, "Create catalog failed due to the loading process fails"); - } - } catch (NoSuchMetalakeException e) { - throw new TrinoException( - GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists."); - } catch (CatalogAlreadyExistsException e) { - throw new TrinoException( - GRAVITINO_CATALOG_ALREADY_EXISTS, - "Catalog " - + NameIdentifier.of(metalakeName, catalogName) - + " already exists in the server."); - } catch (Exception e) { - throw new TrinoException( - GRAVITINO_UNSUPPORTED_OPERATION, "Create catalog failed. " + e.getMessage(), e); - } - } - - public void dropCatalog(String metalakeName, String catalogName, boolean ignoreNotExist) { - try { - GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName); - if (!metalake.catalogExists(catalogName)) { - if (ignoreNotExist) { - return; - } - - throw new TrinoException( - GRAVITINO_CATALOG_NOT_EXISTS, - "Catalog " + NameIdentifier.of(metalakeName, catalogName) + " not exists."); - } - boolean dropped = metalake.dropCatalog(catalogName); - if (!dropped) { - throw new TrinoException( - GRAVITINO_UNSUPPORTED_OPERATION, "Failed to drop no empty catalog " + catalogName); - } - LOG.info("Drop catalog {} in metalake {} successfully.", catalogName, metalake); - - Future future = executorService.submit(this::loadMetalake); - future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); - - if (catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) { - throw new TrinoException( - GRAVITINO_OPERATION_FAILED, "Drop catalog failed due to the reloading process fails"); - } - } catch (NoSuchMetalakeException e) { - throw new TrinoException( - GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists."); - } catch (Exception e) { - throw new TrinoException( - GRAVITINO_UNSUPPORTED_OPERATION, "Drop catalog failed. " + e.getMessage(), e); - } - } - - public void alterCatalog( - String metalakeName, - String catalogName, - Map setProperties, - List removeProperties) { - NameIdentifier catalog = NameIdentifier.of(metalakeName, catalogName); - try { - CatalogConnectorContext catalogConnectorContext = - catalogConnectors.get(getTrinoCatalogName(metalakeName, catalogName)); - GravitinoCatalog oldCatalog = catalogConnectorContext.getCatalog(); - - List changes = new ArrayList<>(); - setProperties - .entrySet() - .forEach( - e -> { - // Skip the no changed attributes - boolean matched = - oldCatalog.getProperties().entrySet().stream() - .anyMatch( - oe -> - oe.getKey().equals(e.getKey()) - && oe.getValue().equals(e.getValue())); - if (!matched) { - changes.add(CatalogChange.setProperty(e.getKey(), e.getValue())); - } - }); - - removeProperties.forEach( - key -> { - if (oldCatalog.getProperties().containsKey(key)) { - changes.add(CatalogChange.removeProperty(key)); - } - }); - - if (changes.isEmpty()) { - return; - } - - GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName); - metalake.alterCatalog(catalogName, changes.toArray(changes.toArray(new CatalogChange[0]))); - - Future future = executorService.submit(this::loadMetalake); - future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); - - catalogConnectorContext = - catalogConnectors.get(getTrinoCatalogName(metalakeName, catalogName)); - if (catalogConnectorContext == null - || catalogConnectorContext.getCatalog().getLastModifiedTime() - == oldCatalog.getLastModifiedTime()) { - throw new TrinoException( - GRAVITINO_OPERATION_FAILED, "Update catalog failed due to the reloading process fails"); - } - - } catch (NoSuchMetalakeException e) { - throw new TrinoException( - GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists."); - } catch (NoSuchCatalogException e) { - throw new TrinoException(GRAVITINO_CATALOG_NOT_EXISTS, "Catalog " + catalog + " not exists."); - } catch (Exception e) { - throw new TrinoException( - GRAVITINO_UNSUPPORTED_OPERATION, "alter catalog failed. " + e.getMessage(), e); - } - } - public void addMetalake(String metalake) { if (config.simplifyCatalogNames() && usedMetalakes.size() > 1) throw new TrinoException( @@ -437,4 +288,18 @@ public Connector createConnector( GRAVITINO_OPERATION_FAILED, "Failed to create connector: " + connectorName, e); } } + + public void loadMetalakeSync() throws Exception { + Future future = executorService.submit(this::loadMetalake); + future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); + } + + public GravitinoMetalake getMetalake(String metalake) { + if (!usedMetalakes.contains(metalake)) { + throw new TrinoException( + GRAVITINO_OPERATION_FAILED, + "This connector does not allowed to access metalake " + metalake); + } + return metalakes.computeIfAbsent(metalake, this::retrieveMetalake); + } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/AlterCatalogStoredProcedure.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/AlterCatalogStoredProcedure.java index 35e2f8f6822..73e27828c98 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/AlterCatalogStoredProcedure.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/AlterCatalogStoredProcedure.java @@ -4,10 +4,21 @@ */ package com.datastrato.gravitino.trino.connector.system.storedprocdure; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CATALOG_NOT_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_OPERATION_FAILED; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION; import static com.datastrato.gravitino.trino.connector.system.table.GravitinoSystemTable.SYSTEM_TABLE_SCHEMA_NAME; import static io.trino.spi.type.VarcharType.VARCHAR; +import com.datastrato.gravitino.CatalogChange; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.exceptions.NoSuchCatalogException; +import com.datastrato.gravitino.exceptions.NoSuchMetalakeException; +import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorContext; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager; +import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog; +import io.trino.spi.TrinoException; import io.trino.spi.block.ArrayBlock; import io.trino.spi.procedure.Procedure; import io.trino.spi.type.ArrayType; @@ -15,11 +26,15 @@ import io.trino.spi.type.TypeOperators; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AlterCatalogStoredProcedure extends GravitinoStoredProcedure { + private static final Logger LOG = LoggerFactory.getLogger(AlterCatalogStoredProcedure.class); private final CatalogConnectorManager catalogConnectorManager; private final String metalake; @@ -56,6 +71,61 @@ public Procedure createStoredProcedure() throws NoSuchMethodException, IllegalAc public void alterCatalog( String catalogName, Map setProperties, List removeProperties) { - catalogConnectorManager.alterCatalog(metalake, catalogName, setProperties, removeProperties); + try { + CatalogConnectorContext catalogConnectorContext = + catalogConnectorManager.getCatalogConnector( + catalogConnectorManager.getTrinoCatalogName(metalake, catalogName)); + GravitinoCatalog oldCatalog = catalogConnectorContext.getCatalog(); + + List changes = new ArrayList<>(); + setProperties.forEach( + (key, value) -> { + // Skip the no changed attributes + boolean matched = + oldCatalog.getProperties().entrySet().stream() + .anyMatch(oe -> oe.getKey().equals(key) && oe.getValue().equals(value)); + if (!matched) { + changes.add(CatalogChange.setProperty(key, value)); + } + }); + + removeProperties.forEach( + key -> { + if (oldCatalog.getProperties().containsKey(key)) { + changes.add(CatalogChange.removeProperty(key)); + } + }); + + if (changes.isEmpty()) { + return; + } + + catalogConnectorContext + .getMetalake() + .alterCatalog(catalogName, changes.toArray(changes.toArray(new CatalogChange[0]))); + + catalogConnectorManager.loadMetalakeSync(); + catalogConnectorContext = + catalogConnectorManager.getCatalogConnector( + catalogConnectorManager.getTrinoCatalogName(metalake, catalogName)); + if (catalogConnectorContext == null + || catalogConnectorContext.getCatalog().getLastModifiedTime() + == oldCatalog.getLastModifiedTime()) { + throw new TrinoException( + GRAVITINO_OPERATION_FAILED, "Update catalog failed due to the reloading process fails"); + } + LOG.info("Alter catalog {} in metalake {} successfully.", catalogName, metalake); + + } catch (NoSuchMetalakeException e) { + throw new TrinoException( + GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalake + " not exists."); + } catch (NoSuchCatalogException e) { + throw new TrinoException( + GRAVITINO_CATALOG_NOT_EXISTS, + "Catalog " + NameIdentifier.of(metalake, catalogName) + " not exists."); + } catch (Exception e) { + throw new TrinoException( + GRAVITINO_UNSUPPORTED_OPERATION, "alter catalog failed. " + e.getMessage(), e); + } } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/CreateCatalogStoredProcedure.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/CreateCatalogStoredProcedure.java index b074805b8f5..4b7ad6b7d48 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/CreateCatalogStoredProcedure.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/CreateCatalogStoredProcedure.java @@ -4,11 +4,20 @@ */ package com.datastrato.gravitino.trino.connector.system.storedprocdure; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CATALOG_ALREADY_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_OPERATION_FAILED; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION; import static com.datastrato.gravitino.trino.connector.system.table.GravitinoSystemTable.SYSTEM_TABLE_SCHEMA_NAME; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.VarcharType.VARCHAR; +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.exceptions.CatalogAlreadyExistsException; +import com.datastrato.gravitino.exceptions.NoSuchMetalakeException; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager; +import io.trino.spi.TrinoException; import io.trino.spi.procedure.Procedure; import io.trino.spi.type.MapType; import io.trino.spi.type.TypeOperators; @@ -16,8 +25,11 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CreateCatalogStoredProcedure extends GravitinoStoredProcedure { + private static final Logger LOG = LoggerFactory.getLogger(CreateCatalogStoredProcedure.class); private final CatalogConnectorManager catalogConnectorManager; private final String metalake; @@ -51,6 +63,43 @@ public Procedure createStoredProcedure() throws NoSuchMethodException, IllegalAc public void createCatalog( String catalogName, String provider, Map properties, boolean ignoreExist) { - catalogConnectorManager.createCatalog(metalake, catalogName, provider, properties, ignoreExist); + boolean exists = + catalogConnectorManager.catalogConnectorExist( + catalogConnectorManager.getTrinoCatalogName(metalake, catalogName)); + if (exists) { + if (!ignoreExist) { + throw new TrinoException( + GRAVITINO_CATALOG_ALREADY_EXISTS, + String.format("Catalog %s already exists.", NameIdentifier.of(metalake, catalogName))); + } + return; + } + + try { + catalogConnectorManager + .getMetalake(metalake) + .createCatalog( + catalogName, Catalog.Type.RELATIONAL, provider, "Trino created", properties); + + catalogConnectorManager.loadMetalakeSync(); + if (!catalogConnectorManager.catalogConnectorExist( + catalogConnectorManager.getTrinoCatalogName(metalake, catalogName))) { + throw new TrinoException( + GRAVITINO_OPERATION_FAILED, "Create catalog failed due to the loading process fails"); + } + + LOG.info("Create catalog {} in metalake {} successfully.", catalogName, metalake); + + } catch (NoSuchMetalakeException e) { + throw new TrinoException( + GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalake + " not exists."); + } catch (CatalogAlreadyExistsException e) { + throw new TrinoException( + GRAVITINO_CATALOG_ALREADY_EXISTS, + "Catalog " + NameIdentifier.of(metalake, catalogName) + " already exists in the server."); + } catch (Exception e) { + throw new TrinoException( + GRAVITINO_UNSUPPORTED_OPERATION, "Create catalog failed. " + e.getMessage(), e); + } } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/DropCatalogStoredProcedure.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/DropCatalogStoredProcedure.java index 3d00bdb5e39..70dce146ef9 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/DropCatalogStoredProcedure.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/system/storedprocdure/DropCatalogStoredProcedure.java @@ -4,17 +4,28 @@ */ package com.datastrato.gravitino.trino.connector.system.storedprocdure; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CATALOG_NOT_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_OPERATION_FAILED; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION; import static com.datastrato.gravitino.trino.connector.system.table.GravitinoSystemTable.SYSTEM_TABLE_SCHEMA_NAME; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.VarcharType.VARCHAR; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.exceptions.NoSuchMetalakeException; +import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorContext; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager; +import io.trino.spi.TrinoException; import io.trino.spi.procedure.Procedure; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DropCatalogStoredProcedure extends GravitinoStoredProcedure { + private static final Logger LOG = LoggerFactory.getLogger(DropCatalogStoredProcedure.class); private final CatalogConnectorManager catalogConnectorManager; private final String metalake; @@ -42,6 +53,41 @@ public Procedure createStoredProcedure() throws NoSuchMethodException, IllegalAc } public void dropCatalog(String catalogName, boolean ignoreNotExist) { - catalogConnectorManager.dropCatalog(metalake, catalogName, ignoreNotExist); + try { + CatalogConnectorContext catalogConnector = + catalogConnectorManager.getCatalogConnector( + catalogConnectorManager.getTrinoCatalogName(metalake, catalogName)); + if (catalogConnector == null) { + if (ignoreNotExist) { + return; + } + + throw new TrinoException( + GRAVITINO_CATALOG_NOT_EXISTS, + "Catalog " + NameIdentifier.of(metalake, catalogName) + " not exists."); + } + boolean dropped = catalogConnector.getMetalake().dropCatalog(catalogName); + if (!dropped) { + throw new TrinoException( + GRAVITINO_UNSUPPORTED_OPERATION, "Failed to drop no empty catalog " + catalogName); + } + + catalogConnectorManager.loadMetalakeSync(); + + if (catalogConnectorManager.catalogConnectorExist( + catalogConnectorManager.getTrinoCatalogName(metalake, catalogName))) { + throw new TrinoException( + GRAVITINO_OPERATION_FAILED, "Drop catalog failed due to the reloading process fails"); + } + + LOG.info("Drop catalog {} in metalake {} successfully.", catalogName, metalake); + + } catch (NoSuchMetalakeException e) { + throw new TrinoException( + GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalake + " not exists."); + } catch (Exception e) { + throw new TrinoException( + GRAVITINO_UNSUPPORTED_OPERATION, "Drop catalog failed. " + e.getMessage(), e); + } } }