diff --git a/connectors/connector-iceberg/iceberg-bridge/pom.xml b/connectors/connector-iceberg/iceberg-bridge/pom.xml new file mode 100644 index 000000000..dd8f92e53 --- /dev/null +++ b/connectors/connector-iceberg/iceberg-bridge/pom.xml @@ -0,0 +1,209 @@ + + + + alink_connectors_iceberg + com.alibaba.alink + 1.5-SNAPSHOT + + 4.0.0 + + alink_iceberg_bridge_flink-${alink.flink.major.version}_${alink.scala.major.version} + jar + alink-iceberg-bridge + + + 2.7.5 + 9.0 + 2.3.4 + 0.12.0 + + + + + org.apache.flink + flink-shaded-hadoop-2-uber + ${hivemetastore.hadoop.version}-${flink.shaded.version} + provided + + + org.apache.commons + commons-lang3 + 3.4 + provided + + + org.apache.flink + flink-runtime_${alink.scala.major.version} + ${flink.version} + provided + + + org.apache.flink + flink-core + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + org.apache.flink + flink-connector-hive_${alink.scala.major.version} + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime-blink_${alink.scala.major.version} + ${flink.version} + provided + + + + org.apache.hive + hive-exec + ${hive.version} + provided + + + org.apache.hive + hive-vector-code-gen + + + org.apache.hive + hive-llap-tez + + + org.apache.hive + hive-shims + + + commons-codec + commons-codec + + + commons-httpclient + commons-httpclient + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.antlr + antlr-runtime + + + org.antlr + ST4 + + + org.apache.ant + ant + + + org.apache.commons + commons-compress + + + org.apache.ivy + ivy + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + apache-curator + + + org.apache.curator + curator-framework + + + org.codehaus.groovy + groovy-all + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-druid + + + org.apache.calcite.avatica + avatica + + + org.apache.calcite + calcite-avatica + + + com.google.code.gson + gson + + + stax + stax-api + + + com.google.guava + guava + + + log4j + log4j + + + log4j + apache-log4j-extras + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.iceberg + iceberg-flink-runtime + ${iceberg.version} + provided + + + + org.slf4j + slf4j-api + 1.7.21 + provided + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + \ No newline at end of file diff --git a/connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java b/connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java new file mode 100644 index 000000000..4925b2c10 --- /dev/null +++ b/connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java @@ -0,0 +1,57 @@ +package org.apache.flink.iceberg; + +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.iceberg.source.FlinkInputFormat; +import org.apache.flink.iceberg.source.FlinkSource; +import org.apache.flink.iceberg.util.FlinkCompatibilityUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; + +import java.io.IOException; +import java.io.UncheckedIOException; + +public class InputOutputFormat { + + public static Tuple3, RichInputFormat> createInputFormat( + StreamExecutionEnvironment execEnv, Catalog catalog, ObjectPath objectPath) { + + if (!(catalog instanceof FlinkCatalog)) { + throw new RuntimeException("Catalog should be iceberg catalog."); + } + + TableLoader tableLoader = createTableLoader((FlinkCatalog) catalog, objectPath); + + Table table; + Schema icebergSchema; + tableLoader.open(); + try (TableLoader loader = tableLoader) { + table = loader.loadTable(); + icebergSchema = table.schema(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); + + TypeInformation typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(icebergSchema)); + FlinkInputFormat flinkInputFormat = FlinkSource.forRowData() + .env(execEnv) + .tableLoader(tableLoader) + .table(table) + .buildFormat(); + return Tuple3.of(tableSchema, typeInfo, flinkInputFormat); + } + + private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) { + Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); + return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); + } +} diff --git a/connectors/connector-iceberg/pom.xml b/connectors/connector-iceberg/pom.xml new file mode 100644 index 000000000..36f4fafc6 --- /dev/null +++ b/connectors/connector-iceberg/pom.xml @@ -0,0 +1,20 @@ + + + + + alink_connectors + com.alibaba.alink + 1.5-SNAPSHOT + + 4.0.0 + + alink_connectors_iceberg + pom + alink-connector-iceberg + + + iceberg-bridge + + diff --git a/connectors/pom.xml b/connectors/pom.xml index 32a65181e..16e11792d 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -19,6 +19,7 @@ connector-odps connector-jdbc connector-hive + connector-iceberg filesystem diff --git a/core/src/main/java/com/alibaba/alink/common/io/catalog/IcebergCatalog.java b/core/src/main/java/com/alibaba/alink/common/io/catalog/IcebergCatalog.java new file mode 100644 index 000000000..1ed225c37 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/common/io/catalog/IcebergCatalog.java @@ -0,0 +1,513 @@ +package com.alibaba.alink.common.io.catalog; + +import com.alibaba.alink.common.MLEnvironmentFactory; +import com.alibaba.alink.common.io.annotations.CatalogAnnotation; +import com.alibaba.alink.common.io.catalog.plugin.IcebergClassLoaderFactory; +import com.alibaba.alink.common.io.catalog.plugin.RichInputFormatWithClassLoader; +import com.alibaba.alink.common.io.filesystem.FilePath; +import com.alibaba.alink.common.io.filesystem.LocalFileSystem; +import com.alibaba.alink.common.utils.DataSetConversionUtil; +import com.alibaba.alink.common.utils.DataStreamConversionUtil; +import com.alibaba.alink.operator.common.io.reader.HttpFileSplitReader; +import com.alibaba.alink.params.io.IcebergCatalogParams; +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@CatalogAnnotation(name = "iceberg") +public class IcebergCatalog extends BaseCatalog { + + public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalog.class); + + private Catalog internal; + private static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir"; + private static final String WAREHOUSE = "warehouse"; + private static final String HIVE_URI = "uri"; + private static final String CATALOG_TYPE = "catalog-type"; + private static final String CATALOG_DEFAULT_DATABASE = "default-database"; + private final IcebergClassLoaderFactory icebergClassLoaderFactory; + + + public IcebergCatalog(String catalogName, String defaultDatabase, String icebergVersion, String warehouse, String uri) { + this(catalogName, defaultDatabase, icebergVersion, null, "hive", warehouse, uri); + } + + public IcebergCatalog(String catalogName, String defaultDatabase, String icebergVersion, FilePath hiveConfDir, + String catalogType, String warehouse, String uri) { + + this(new Params() + .set(IcebergCatalogParams.CATALOG_NAME, catalogName) + .set(IcebergCatalogParams.DEFAULT_DATABASE, defaultDatabase == null ? "default" : defaultDatabase) + .set(IcebergCatalogParams.HIVE_CONF_DIR, hiveConfDir == null ? null : hiveConfDir.serialize()) + .set(IcebergCatalogParams.PLUGIN_VERSION, icebergVersion) + .set(IcebergCatalogParams.CATALOG_TYPE, catalogType) + .set(IcebergCatalogParams.WAREHOUSE, warehouse) + .set(IcebergCatalogParams.HIVE_URI, uri) + ); + } + + public IcebergCatalog(Params params) { + super(params); + icebergClassLoaderFactory = new IcebergClassLoaderFactory(getParams().get(IcebergCatalogParams.PLUGIN_VERSION)); + } + + @Override + public Table sourceStream(ObjectPath objectPath, Params params, Long sessionId) { + Catalog catalog = loadCatalog(); + + Tuple3, RichInputFormat> all + = createInputFormat(MLEnvironmentFactory.get(sessionId).getStreamExecutionEnvironment(), + objectPath, catalog, icebergClassLoaderFactory); + + DataStream dataStream = MLEnvironmentFactory + .get(sessionId) + .getStreamExecutionEnvironment() + .createInput(all.f2, all.f1) + .map(new RowDataToRow(all.f0.getFieldDataTypes())); + + return DataStreamConversionUtil.toTable(sessionId, dataStream, all.f0); + } + + @Override + public void sinkStream(ObjectPath objectPath, Table in, Params params, Long sessionId) { + throw new UnsupportedOperationException(); + } + + @Override + public Table sourceBatch(ObjectPath objectPath, Params params, Long sessionId) { + Catalog catalog = loadCatalog(); + + Tuple3, RichInputFormat> all + = createInputFormat(MLEnvironmentFactory.get(sessionId).getStreamExecutionEnvironment(), + objectPath, catalog, icebergClassLoaderFactory); + + DataSet ds = MLEnvironmentFactory.get(sessionId) + .getExecutionEnvironment() + .createInput(all.f2, all.f1) + .map(new IcebergCatalog.RowDataToRow(all.f0.getFieldDataTypes())); + + return DataSetConversionUtil.toTable(sessionId, ds, all.f0); + } + + private static class RowDataToRow implements MapFunction { + private static final long serialVersionUID = -2751018757273958023L; + + DataType[] dataTypes; + + RowDataToRow(DataType[] dataTypes) { + this.dataTypes = dataTypes; + } + + @Override + public Row map(RowData baseRow) throws Exception { + Row row = new Row(baseRow.getArity()); + for (int i = 0; i < baseRow.getArity(); i++) { + if (baseRow.isNullAt(i)) { + row.setField(i, null); + } else { + Object o = RowData + .createFieldGetter(dataTypes[i].getLogicalType(), i) + .getFieldOrNull(baseRow); + + if (o instanceof BinaryStringData) { + o = o.toString(); + } else if (o instanceof DecimalData) { + o = ((DecimalData) o).toBigDecimal(); + } + + row.setField(i, o); + } + } + return row; + } + } + + @Override + public void sinkBatch(ObjectPath objectPath, Table in, Params params, Long sessionId) { + throw new UnsupportedOperationException(); + } + + @Override + public void open() throws CatalogException { + icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().open()); + } + + private Catalog loadCatalog() { + if (internal == null) { + internal = icebergClassLoaderFactory + .doAsThrowRuntime(() -> { + Catalog catalog = createCatalog(getParams(), Thread.currentThread().getContextClassLoader()); + catalog.open(); + + return catalog; + }); + } + + return internal; + } + + + private Catalog createCatalog(Params params, ClassLoader classLoader) throws ClassNotFoundException { + String catalogName = params.get(IcebergCatalogParams.CATALOG_NAME); + + CatalogFactory factory = createCatalogFactory(classLoader); + Map properties = new HashMap<>(); + + String catalogType = params.get(IcebergCatalogParams.CATALOG_TYPE); + properties.put(CATALOG_TYPE, catalogType); + if ("hive".equals(catalogType)) { + if (params.contains(IcebergCatalogParams.HIVE_CONF_DIR) && params.get(IcebergCatalogParams.HIVE_CONF_DIR) != null) { + String localHiveConfDir; + try { + localHiveConfDir = downloadHiveConf( + FilePath.deserialize(params.get(IcebergCatalogParams.HIVE_CONF_DIR)) + ); + } catch (IOException e) { + throw new IllegalStateException(e); + } + properties.put(CATALOG_HIVE_CONF_DIR, localHiveConfDir); + } else { + properties.put(HIVE_URI, params.get(IcebergCatalogParams.HIVE_URI)); + properties.put(WAREHOUSE, params.get(IcebergCatalogParams.WAREHOUSE)); + } + } else { + properties.put(WAREHOUSE, params.get(IcebergCatalogParams.WAREHOUSE)); + } + + + if (params.get(IcebergCatalogParams.DEFAULT_DATABASE) != null) { + properties.put(CATALOG_DEFAULT_DATABASE, params.get(IcebergCatalogParams.DEFAULT_DATABASE)); + } + + return factory.createCatalog(catalogName, properties); + } + + public static String downloadHiveConf(FilePath hiveConfDir) throws IOException { + return downloadFolder(hiveConfDir, "hive-site.xml"); + } + + public static String downloadFolder(FilePath folder, String... files) throws IOException { + // local + if (folder.getFileSystem() instanceof LocalFileSystem) { + return folder.getPathStr(); + } + + File localConfDir = new File(System.getProperty("java.io.tmpdir"), FileUtils.getRandomFilename("")); + String scheme = folder.getPath().toUri().getScheme(); + + if (!localConfDir.mkdir()) { + throw new RuntimeException("Could not create the dir " + localConfDir.getAbsolutePath()); + } + + if (scheme != null && (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https"))) { + for (String path : files) { + try (HttpFileSplitReader reader = new HttpFileSplitReader(folder.getPathStr() + "/" + path)) { + long fileLen = reader.getFileLength(); + reader.open(null, 0, fileLen); + + int offset = 0; + byte[] buffer = new byte[1024]; + + try (FileOutputStream outputStream = new FileOutputStream( + Paths.get(localConfDir.getPath(), path).toFile())) { + while (offset < fileLen) { + int len = reader.read(buffer, offset, 1024); + outputStream.write(buffer, offset, len); + offset += len; + } + } + + } catch (FileNotFoundException exception) { + // pass + } + } + } else { + for (String path : files) { + // file system + if (!folder.getFileSystem().exists(new Path(folder.getPath(), path))) { + continue; + } + + try (FSDataInputStream inputStream = folder.getFileSystem().open( + new Path(folder.getPath(), path)); + FileOutputStream outputStream = new FileOutputStream( + Paths.get(localConfDir.getPath(), path).toFile())) { + IOUtils.copy(inputStream, outputStream); + } + } + } + + return localConfDir.getAbsolutePath(); + } + + public static CatalogFactory createCatalogFactory(ClassLoader classLoader) { + try { + return (CatalogFactory) classLoader + .loadClass("org.apache.flink.iceberg.FlinkCatalogFactory") + .getConstructor() + .newInstance(); + } catch (ClassNotFoundException | NoSuchMethodException + | InstantiationException | IllegalAccessException | InvocationTargetException e) { + + throw new RuntimeException("Could not find the iceberg catelog factory.", e); + } + } + + + @Override + public void close() throws CatalogException { + icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().close()); + } + + @Override + public List listDatabases() throws CatalogException { + return icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().listDatabases()); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listTables(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listViews(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + return icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().tableExists(tablePath)); + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitions(ObjectPath tablePath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitionsByFilter(ObjectPath tablePath, List filters) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws PartitionAlreadyExistsException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listFunctions(String dbName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + private Tuple3, RichInputFormat> createInputFormat( + StreamExecutionEnvironment senv, ObjectPath objectPath, Catalog catalog, IcebergClassLoaderFactory factory) { + + return factory.doAsThrowRuntime(() -> { + Class inputOutputFormat = Class.forName( + "org.apache.flink.iceberg.InputOutputFormat", + true, Thread.currentThread().getContextClassLoader() + ); + + Method method = inputOutputFormat.getMethod("createInputFormat", StreamExecutionEnvironment.class, Catalog.class, ObjectPath.class); + Tuple3, RichInputFormat> internalRet = + (Tuple3, RichInputFormat>) method.invoke(null, senv, catalog, objectPath); + + return Tuple3.of(internalRet.f0, internalRet.f1, + new RichInputFormatWithClassLoader<>(factory, internalRet.f2)); + }); + } +} diff --git a/core/src/main/java/com/alibaba/alink/common/io/catalog/plugin/IcebergClassLoaderFactory.java b/core/src/main/java/com/alibaba/alink/common/io/catalog/plugin/IcebergClassLoaderFactory.java new file mode 100644 index 000000000..9db5b8c2f --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/common/io/catalog/plugin/IcebergClassLoaderFactory.java @@ -0,0 +1,67 @@ +package com.alibaba.alink.common.io.catalog.plugin; + +import com.alibaba.alink.common.io.plugin.ClassLoaderContainer; +import com.alibaba.alink.common.io.plugin.ClassLoaderFactory; +import com.alibaba.alink.common.io.plugin.PluginDescriptor; +import com.alibaba.alink.common.io.plugin.RegisterKey; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.util.TemporaryClassLoaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.security.PrivilegedExceptionAction; +import java.util.function.Function; +import java.util.function.Predicate; + +public class IcebergClassLoaderFactory extends ClassLoaderFactory implements Serializable { + private static final long serialVersionUID = 1233515335175475912L; + + private final static Logger LOG = LoggerFactory.getLogger(IcebergClassLoaderFactory.class); + private static final String ICEBERG_DB_NAME = "iceberg"; + + public IcebergClassLoaderFactory(String version) { + super(new RegisterKey(ICEBERG_DB_NAME, version), ClassLoaderContainer.createPluginContextOnClient()); + } + + @Override + public T doAs(PrivilegedExceptionAction action) throws Exception { + + ClassLoader classLoader = create(); + + try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) { + return action.run(); + } + } + + @Override + public ClassLoader create() { + return ClassLoaderContainer + .getInstance() + .create( + registerKey, + registerContext, + Factory.class, + new IcebergServiceFilter(), + new IcebergCatalogVersionGetter() + ); + } + + private static class IcebergServiceFilter implements Predicate { + + @Override + public boolean test(Factory factory) { + return factory.getClass().getName().contains("FlinkCatalogFactory"); + } + } + + private static class IcebergCatalogVersionGetter implements + Function, String> { + + @Override + public String apply(Tuple2 factory) { + return factory.f1.getVersion(); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/alink/params/io/IcebergCatalogParams.java b/core/src/main/java/com/alibaba/alink/params/io/IcebergCatalogParams.java new file mode 100644 index 000000000..36c9dc114 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/params/io/IcebergCatalogParams.java @@ -0,0 +1,75 @@ +package com.alibaba.alink.params.io; + +import com.alibaba.alink.params.io.shared.HasCatalogName; +import com.alibaba.alink.params.io.shared.HasDefaultDatabase; +import com.alibaba.alink.params.io.shared.HasPartition; +import com.alibaba.alink.params.io.shared.HasPartitions; +import com.alibaba.alink.params.io.shared.HasPluginVersion; +import com.alibaba.alink.params.shared.HasOverwriteSink; +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.ParamInfoFactory; + +public interface IcebergCatalogParams + extends HasCatalogName, + HasDefaultDatabase, + HasPluginVersion, + HasOverwriteSink { + + + ParamInfo CATALOG_TYPE = ParamInfoFactory + .createParamInfo("catalog-type", String.class) + .setDescription("Iceberg currently support hive or hadoop catalog type") + .setRequired() + .build(); + + default T setCatalogType(String value) { + return set(CATALOG_TYPE, value); + } + + default String getCatalogType() { + return get(CATALOG_TYPE); + } + + ParamInfo WAREHOUSE = ParamInfoFactory + .createParamInfo("warehouse", String.class) + .setDescription( + "The Hive warehouse location.The Hive warehouse location, users should specify this path" + + " if neither set the hive-conf-dir to specify a location containing a hive-site.xml " + + "configuration file nor add a correct hive-site.xml to classpath") + .build(); + + default T setWarehouse(String value) { + return set(WAREHOUSE, value); + } + + default String getWarehouse() { + return get(WAREHOUSE); + } + + + ParamInfo HIVE_CONF_DIR = ParamInfoFactory + .createParamInfo("hiveConfDir", String.class) + .setDescription("Hive configuration directory") + .build(); + + default T setHiveConfDir(String value) { + return set(HIVE_CONF_DIR, value); + } + + default String getHiveConfDir() { + return get(HIVE_CONF_DIR); + } + + ParamInfo HIVE_URI = ParamInfoFactory + .createParamInfo("uri", String.class) + .setDescription("Hive metastore uri ") + .build(); + + default T setHiveUri(String value) { + return set(HIVE_URI, value); + } + + default String getHiveUri() { + return get(HIVE_URI); + } +}