From c88506f46e92f2a5d3c6c8f4bdf821d90b6cc9f1 Mon Sep 17 00:00:00 2001 From: FANNG Date: Mon, 8 Apr 2024 14:16:40 +0800 Subject: [PATCH] [#2388] feat(spark-connector): filter non relational catalogs before register to spark catalog manager (#2830) ### What changes were proposed in this pull request? filter non relational catalogs before register to spark catalog manager ### Why are the changes needed? Fix: #2388 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test --- .../catalog/GravitinoCatalogManager.java | 18 +++++++++--------- .../plugin/GravitinoDriverPlugin.java | 5 +++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java index b9c4d5229f0..e00f63b6203 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java @@ -73,15 +73,15 @@ public String getMetalakeName() { return metalakeName; } - public Set listCatalogs() { - NameIdentifier[] catalogNames = metalake.listCatalogs(Namespace.ofCatalog(metalake.name())); - LOG.info( - "Load metalake {}'s catalogs. catalogs: {}.", - metalake.name(), - Arrays.toString(catalogNames)); - return Arrays.stream(catalogNames) - .map(identifier -> identifier.name()) - .collect(Collectors.toSet()); + public void loadRelationalCatalogs() { + Catalog[] catalogs = metalake.listCatalogsInfo(Namespace.ofCatalog(metalake.name())); + Arrays.stream(catalogs) + .filter(catalog -> Catalog.Type.RELATIONAL.equals(catalog.type())) + .forEach(catalog -> gravitinoCatalogs.put(catalog.name(), catalog)); + } + + public Set getCatalogNames() { + return gravitinoCatalogs.asMap().keySet().stream().collect(Collectors.toSet()); } private Catalog loadCatalog(String catalogName) { diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java index af728cc7325..88235c3877e 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java @@ -44,8 +44,9 @@ public Map init(SparkContext sc, PluginContext pluginContext) { "%s:%s, should not be empty", GravitinoSparkConfig.GRAVITINO_METALAKE, metalake)); catalogManager = GravitinoCatalogManager.create(gravitinoUri, metalake); - Set catalogs = catalogManager.listCatalogs(); - registerGravitinoCatalogs(conf, catalogs); + catalogManager.loadRelationalCatalogs(); + Set catalogNames = catalogManager.getCatalogNames(); + registerGravitinoCatalogs(conf, catalogNames); registerSqlExtensions(); return Collections.emptyMap(); }