diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 6cbfbfa53b4..c7d9c4fe7a7 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -29,7 +29,7 @@ repositories { var paimonVersion: String = libs.versions.paimon.get() val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") - +val flinkJdbcVersion: String = "3.2.0-1.18" val icebergVersion: String = libs.versions.iceberg.get() // The Flink only support scala 2.12, and all scala api will be removed in a future version. @@ -56,6 +56,7 @@ dependencies { compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion") + compileOnly("org.apache.flink:flink-connector-jdbc:$flinkJdbcVersion") compileOnly(libs.hive2.exec) { artifact { diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index e9320c786cd..1c875920642 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -101,7 +101,7 @@ protected BaseCatalog( this.partitionConverter = partitionConverter; } - protected abstract AbstractCatalog realCatalog(); + protected abstract org.apache.flink.table.catalog.Catalog realCatalog(); @Override public void open() throws CatalogException {} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java new file mode 100644 index 00000000000..30cb4886dc6 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; + +public class GravitinoJdbcCatalog extends BaseCatalog { + + Catalog jdbcCatalog; + + protected GravitinoJdbcCatalog( + CatalogFactory.Context context, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { + super(context.getName(), defaultDatabase, propertiesConverter, partitionConverter); + JdbcCatalogFactory jdbcCatalogFactory = new JdbcCatalogFactory(); + this.jdbcCatalog = jdbcCatalogFactory.createCatalog(context); + } + + @Override + protected Catalog realCatalog() { + return jdbcCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java new file mode 100644 index 00000000000..363f45c43bc --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactoryOptions; + +public class GravitinoJdbcCatalogFactory implements BaseCatalogFactory { + + @Override + public String gravitinoCatalogProvider() { + return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Catalog.Type gravitinoCatalogType() { + return Catalog.Type.RELATIONAL; + } + + @Override + public PropertiesConverter propertiesConverter() { + return JdbcPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java new file mode 100644 index 00000000000..a38c3c236b1 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +public class GravitinoJdbcCatalogFactoryOptions { + + public static final String IDENTIFIER = "gravitino-jdbc"; +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java new file mode 100644 index 00000000000..e4e42404f87 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.HashMap; +import java.util.Map; + +public class JdbcPropertiesConstants { + + private JdbcPropertiesConstants() {} + + public static final String GRAVITINO_JDBC_USER = "jdbc-user"; + public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password"; + public static final String GRAVITINO_JDBC_URL = "jdbc-url"; + public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver"; + public static final String GRAVITINO_JDBC_DEFAULT_DATABASE = "default-database"; + + public static final String FLINK_JDBC_URL = "base-url"; + public static final String FLINK_JDBC_USER = "user"; + public static final String FLINK_JDBC_PASSWORD = "password"; + public static final String FLINK_JDBC_DEFAULT_DATABASE = "default-database"; + + public static Map flinkToGravitinoMap = new HashMap<>(); + public static Map gravitinoToFlinkMap = new HashMap<>(); + + static { + flinkToGravitinoMap.put(FLINK_JDBC_URL, GRAVITINO_JDBC_URL); + flinkToGravitinoMap.put(FLINK_JDBC_USER, GRAVITINO_JDBC_USER); + flinkToGravitinoMap.put(FLINK_JDBC_PASSWORD, GRAVITINO_JDBC_PASSWORD); + flinkToGravitinoMap.put(FLINK_JDBC_DEFAULT_DATABASE, GRAVITINO_JDBC_DEFAULT_DATABASE); + + gravitinoToFlinkMap.put(GRAVITINO_JDBC_URL, FLINK_JDBC_URL); + gravitinoToFlinkMap.put(GRAVITINO_JDBC_USER, FLINK_JDBC_USER); + gravitinoToFlinkMap.put(GRAVITINO_JDBC_PASSWORD, FLINK_JDBC_PASSWORD); + gravitinoToFlinkMap.put(GRAVITINO_JDBC_DEFAULT_DATABASE, FLINK_JDBC_DEFAULT_DATABASE); + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java new file mode 100644 index 00000000000..41a2f75715e --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public class JdbcPropertiesConverter implements PropertiesConverter { + + public static final JdbcPropertiesConverter INSTANCE = new JdbcPropertiesConverter(); + + private JdbcPropertiesConverter() {} + + @Override + public String transformPropertyToGravitinoCatalog(String configKey) { + return JdbcPropertiesConstants.flinkToGravitinoMap.get(configKey); + } + + @Override + public String transformPropertyToFlinkCatalog(String configKey) { + return JdbcPropertiesConstants.gravitinoToFlinkMap.get(configKey); + } + + @Override + public String getFlinkCatalogType() { + return GravitinoJdbcCatalogFactoryOptions.IDENTIFIER; + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 959123f3362..962f7f2f847 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -66,13 +66,14 @@ public abstract class FlinkEnvIT extends BaseIT { private static String gravitinoUri = "http://127.0.0.1:8090"; @BeforeAll - void startUp() { + void startUp() throws Exception { // Start Gravitino server initGravitinoEnv(); initMetalake(); initHiveEnv(); initHdfsEnv(); initFlinkEnv(); + initCatalogEnv(); LOG.info("Startup Flink env successfully, Gravitino uri: {}.", gravitinoUri); } @@ -83,6 +84,8 @@ static void stop() { LOG.info("Stop Flink env successfully."); } + protected void initCatalogEnv() throws Exception {}; + protected String flinkByPass(String key) { return PropertiesConverter.FLINK_PROPERTY_PREFIX + key; } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java new file mode 100644 index 00000000000..ae2492e6e26 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.jdbc; + +import static org.apache.gravitino.integration.test.util.TestDatabaseName.MYSQL_CATALOG_MYSQL_IT; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.apache.gravitino.flink.connector.jdbc.JdbcPropertiesConstants; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; + +@Tag("gravitino-docker-test") +public class FlinkJdbcMysqlCatalogIT extends FlinkCommonIT { + + protected String mysqlUrl; + protected String mysqlUsername; + protected String mysqlPassword; + protected String mysqlDriver; + protected String mysqlDefaultDatabase = MYSQL_CATALOG_MYSQL_IT.name(); + + protected Catalog catalog; + + protected static final String CATALOG_NAME = "test_flink_jdbc_catalog"; + + @Override + protected Catalog currentCatalog() { + return catalog; + } + + @Override + protected String getProvider() { + return "jdbc-mysql"; + } + + @BeforeAll + void setup() { + init(); + } + + @Override + protected boolean supportDropCascade() { + return true; + } + + private void init() { + Preconditions.checkNotNull(metalake); + catalog = + metalake.createCatalog( + CATALOG_NAME, + org.apache.gravitino.Catalog.Type.RELATIONAL, + getProvider(), + null, + ImmutableMap.of( + JdbcPropertiesConstants.GRAVITINO_JDBC_USER, + mysqlUsername, + JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, + mysqlPassword, + JdbcPropertiesConstants.GRAVITINO_JDBC_URL, + mysqlUrl, + JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, + mysqlDriver, + JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE, + mysqlDefaultDatabase)); + } + + @Override + protected void initCatalogEnv() throws Exception { + ContainerSuite containerSuite = ContainerSuite.getInstance(); + containerSuite.startMySQLContainer(MYSQL_CATALOG_MYSQL_IT); + mysqlUrl = containerSuite.getMySQLContainer().getJdbcUrl(); + mysqlUsername = containerSuite.getMySQLContainer().getUsername(); + mysqlPassword = containerSuite.getMySQLContainer().getPassword(); + mysqlDriver = containerSuite.getMySQLContainer().getDriverClassName(MYSQL_CATALOG_MYSQL_IT); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestJdbcPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestJdbcPropertiesConverter.java new file mode 100644 index 00000000000..1b56c63c224 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestJdbcPropertiesConverter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Test for {@link JdbcPropertiesConverter} */ +public class TestJdbcPropertiesConverter { + + String username = "testUser"; + String password = "testPassword"; + String url = "testUrl"; + String defaultDatabase = "test"; + + private static final JdbcPropertiesConverter CONVERTER = JdbcPropertiesConverter.INSTANCE; + + @Test + public void testToPaimonFileSystemCatalog() { + Map catalogProperties = + ImmutableMap.of( + JdbcPropertiesConstants.GRAVITINO_JDBC_USER, + username, + JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, + password, + JdbcPropertiesConstants.GRAVITINO_JDBC_URL, + url, + JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE, + defaultDatabase); + Map properties = CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals(username, properties.get(JdbcPropertiesConstants.FLINK_JDBC_USER)); + Assertions.assertEquals(password, properties.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD)); + Assertions.assertEquals(url, properties.get(JdbcPropertiesConstants.FLINK_JDBC_URL)); + Assertions.assertEquals( + defaultDatabase, properties.get(JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE)); + } + + @Test + public void testToGravitinoCatalogProperties() { + Configuration configuration = + Configuration.fromMap( + ImmutableMap.of( + JdbcPropertiesConstants.FLINK_JDBC_USER, + username, + JdbcPropertiesConstants.FLINK_JDBC_PASSWORD, + password, + JdbcPropertiesConstants.FLINK_JDBC_URL, + url, + JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE, + defaultDatabase)); + Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); + + Assertions.assertEquals(username, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_USER)); + Assertions.assertEquals( + password, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD)); + Assertions.assertEquals(url, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_URL)); + Assertions.assertEquals( + defaultDatabase, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE)); + } +}