diff --git a/connectors/connector-iceberg/iceberg-bridge/pom.xml b/connectors/connector-iceberg/iceberg-bridge/pom.xml
new file mode 100644
index 000000000..dd8f92e53
--- /dev/null
+++ b/connectors/connector-iceberg/iceberg-bridge/pom.xml
@@ -0,0 +1,209 @@
+
+
+
+ alink_connectors_iceberg
+ com.alibaba.alink
+ 1.5-SNAPSHOT
+
+ 4.0.0
+
+ alink_iceberg_bridge_flink-${alink.flink.major.version}_${alink.scala.major.version}
+ jar
+ alink-iceberg-bridge
+
+
+ 2.7.5
+ 9.0
+ 2.3.4
+ 0.12.0
+
+
+
+
+ org.apache.flink
+ flink-shaded-hadoop-2-uber
+ ${hivemetastore.hadoop.version}-${flink.shaded.version}
+ provided
+
+
+ org.apache.commons
+ commons-lang3
+ 3.4
+ provided
+
+
+ org.apache.flink
+ flink-runtime_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-connector-hive_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime-blink_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+ provided
+
+
+ org.apache.hive
+ hive-vector-code-gen
+
+
+ org.apache.hive
+ hive-llap-tez
+
+
+ org.apache.hive
+ hive-shims
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-httpclient
+ commons-httpclient
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.antlr
+ antlr-runtime
+
+
+ org.antlr
+ ST4
+
+
+ org.apache.ant
+ ant
+
+
+ org.apache.commons
+ commons-compress
+
+
+ org.apache.ivy
+ ivy
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ org.apache.curator
+ apache-curator
+
+
+ org.apache.curator
+ curator-framework
+
+
+ org.codehaus.groovy
+ groovy-all
+
+
+ org.apache.calcite
+ calcite-core
+
+
+ org.apache.calcite
+ calcite-druid
+
+
+ org.apache.calcite.avatica
+ avatica
+
+
+ org.apache.calcite
+ calcite-avatica
+
+
+ com.google.code.gson
+ gson
+
+
+ stax
+ stax-api
+
+
+ com.google.guava
+ guava
+
+
+ log4j
+ log4j
+
+
+ log4j
+ apache-log4j-extras
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+ org.apache.iceberg
+ iceberg-flink-runtime
+ ${iceberg.version}
+ provided
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.21
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 1.8
+
+
+
+
+
\ No newline at end of file
diff --git a/connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java b/connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java
new file mode 100644
index 000000000..4925b2c10
--- /dev/null
+++ b/connectors/connector-iceberg/iceberg-bridge/src/main/java/org/apache/flink/iceberg/InputOutputFormat.java
@@ -0,0 +1,57 @@
+package org.apache.flink.iceberg;
+
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iceberg.source.FlinkInputFormat;
+import org.apache.flink.iceberg.source.FlinkSource;
+import org.apache.flink.iceberg.util.FlinkCompatibilityUtil;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+public class InputOutputFormat {
+
+ public static Tuple3, RichInputFormat> createInputFormat(
+ StreamExecutionEnvironment execEnv, Catalog catalog, ObjectPath objectPath) {
+
+ if (!(catalog instanceof FlinkCatalog)) {
+ throw new RuntimeException("Catalog should be iceberg catalog.");
+ }
+
+ TableLoader tableLoader = createTableLoader((FlinkCatalog) catalog, objectPath);
+
+ Table table;
+ Schema icebergSchema;
+ tableLoader.open();
+ try (TableLoader loader = tableLoader) {
+ table = loader.loadTable();
+ icebergSchema = table.schema();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema));
+
+ TypeInformation typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(icebergSchema));
+ FlinkInputFormat flinkInputFormat = FlinkSource.forRowData()
+ .env(execEnv)
+ .tableLoader(tableLoader)
+ .table(table)
+ .buildFormat();
+ return Tuple3.of(tableSchema, typeInfo, flinkInputFormat);
+ }
+
+ private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
+ Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+ return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
+ }
+}
diff --git a/connectors/connector-iceberg/pom.xml b/connectors/connector-iceberg/pom.xml
new file mode 100644
index 000000000..36f4fafc6
--- /dev/null
+++ b/connectors/connector-iceberg/pom.xml
@@ -0,0 +1,20 @@
+
+
+
+
+ alink_connectors
+ com.alibaba.alink
+ 1.5-SNAPSHOT
+
+ 4.0.0
+
+ alink_connectors_iceberg
+ pom
+ alink-connector-iceberg
+
+
+ iceberg-bridge
+
+
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 32a65181e..16e11792d 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -19,6 +19,7 @@
connector-odps
connector-jdbc
connector-hive
+ connector-iceberg
filesystem
diff --git a/core/src/main/java/com/alibaba/alink/common/io/catalog/IcebergCatalog.java b/core/src/main/java/com/alibaba/alink/common/io/catalog/IcebergCatalog.java
new file mode 100644
index 000000000..1ed225c37
--- /dev/null
+++ b/core/src/main/java/com/alibaba/alink/common/io/catalog/IcebergCatalog.java
@@ -0,0 +1,513 @@
+package com.alibaba.alink.common.io.catalog;
+
+import com.alibaba.alink.common.MLEnvironmentFactory;
+import com.alibaba.alink.common.io.annotations.CatalogAnnotation;
+import com.alibaba.alink.common.io.catalog.plugin.IcebergClassLoaderFactory;
+import com.alibaba.alink.common.io.catalog.plugin.RichInputFormatWithClassLoader;
+import com.alibaba.alink.common.io.filesystem.FilePath;
+import com.alibaba.alink.common.io.filesystem.LocalFileSystem;
+import com.alibaba.alink.common.utils.DataSetConversionUtil;
+import com.alibaba.alink.common.utils.DataStreamConversionUtil;
+import com.alibaba.alink.operator.common.io.reader.HttpFileSplitReader;
+import com.alibaba.alink.params.io.IcebergCatalogParams;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@CatalogAnnotation(name = "iceberg")
+public class IcebergCatalog extends BaseCatalog {
+
+ public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalog.class);
+
+ private Catalog internal;
+ private static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";
+ private static final String WAREHOUSE = "warehouse";
+ private static final String HIVE_URI = "uri";
+ private static final String CATALOG_TYPE = "catalog-type";
+ private static final String CATALOG_DEFAULT_DATABASE = "default-database";
+ private final IcebergClassLoaderFactory icebergClassLoaderFactory;
+
+
+ public IcebergCatalog(String catalogName, String defaultDatabase, String icebergVersion, String warehouse, String uri) {
+ this(catalogName, defaultDatabase, icebergVersion, null, "hive", warehouse, uri);
+ }
+
+ public IcebergCatalog(String catalogName, String defaultDatabase, String icebergVersion, FilePath hiveConfDir,
+ String catalogType, String warehouse, String uri) {
+
+ this(new Params()
+ .set(IcebergCatalogParams.CATALOG_NAME, catalogName)
+ .set(IcebergCatalogParams.DEFAULT_DATABASE, defaultDatabase == null ? "default" : defaultDatabase)
+ .set(IcebergCatalogParams.HIVE_CONF_DIR, hiveConfDir == null ? null : hiveConfDir.serialize())
+ .set(IcebergCatalogParams.PLUGIN_VERSION, icebergVersion)
+ .set(IcebergCatalogParams.CATALOG_TYPE, catalogType)
+ .set(IcebergCatalogParams.WAREHOUSE, warehouse)
+ .set(IcebergCatalogParams.HIVE_URI, uri)
+ );
+ }
+
+ public IcebergCatalog(Params params) {
+ super(params);
+ icebergClassLoaderFactory = new IcebergClassLoaderFactory(getParams().get(IcebergCatalogParams.PLUGIN_VERSION));
+ }
+
+ @Override
+ public Table sourceStream(ObjectPath objectPath, Params params, Long sessionId) {
+ Catalog catalog = loadCatalog();
+
+ Tuple3, RichInputFormat> all
+ = createInputFormat(MLEnvironmentFactory.get(sessionId).getStreamExecutionEnvironment(),
+ objectPath, catalog, icebergClassLoaderFactory);
+
+ DataStream dataStream = MLEnvironmentFactory
+ .get(sessionId)
+ .getStreamExecutionEnvironment()
+ .createInput(all.f2, all.f1)
+ .map(new RowDataToRow(all.f0.getFieldDataTypes()));
+
+ return DataStreamConversionUtil.toTable(sessionId, dataStream, all.f0);
+ }
+
+ @Override
+ public void sinkStream(ObjectPath objectPath, Table in, Params params, Long sessionId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Table sourceBatch(ObjectPath objectPath, Params params, Long sessionId) {
+ Catalog catalog = loadCatalog();
+
+ Tuple3, RichInputFormat> all
+ = createInputFormat(MLEnvironmentFactory.get(sessionId).getStreamExecutionEnvironment(),
+ objectPath, catalog, icebergClassLoaderFactory);
+
+ DataSet ds = MLEnvironmentFactory.get(sessionId)
+ .getExecutionEnvironment()
+ .createInput(all.f2, all.f1)
+ .map(new IcebergCatalog.RowDataToRow(all.f0.getFieldDataTypes()));
+
+ return DataSetConversionUtil.toTable(sessionId, ds, all.f0);
+ }
+
+ private static class RowDataToRow implements MapFunction {
+ private static final long serialVersionUID = -2751018757273958023L;
+
+ DataType[] dataTypes;
+
+ RowDataToRow(DataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ @Override
+ public Row map(RowData baseRow) throws Exception {
+ Row row = new Row(baseRow.getArity());
+ for (int i = 0; i < baseRow.getArity(); i++) {
+ if (baseRow.isNullAt(i)) {
+ row.setField(i, null);
+ } else {
+ Object o = RowData
+ .createFieldGetter(dataTypes[i].getLogicalType(), i)
+ .getFieldOrNull(baseRow);
+
+ if (o instanceof BinaryStringData) {
+ o = o.toString();
+ } else if (o instanceof DecimalData) {
+ o = ((DecimalData) o).toBigDecimal();
+ }
+
+ row.setField(i, o);
+ }
+ }
+ return row;
+ }
+ }
+
+ @Override
+ public void sinkBatch(ObjectPath objectPath, Table in, Params params, Long sessionId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().open());
+ }
+
+ private Catalog loadCatalog() {
+ if (internal == null) {
+ internal = icebergClassLoaderFactory
+ .doAsThrowRuntime(() -> {
+ Catalog catalog = createCatalog(getParams(), Thread.currentThread().getContextClassLoader());
+ catalog.open();
+
+ return catalog;
+ });
+ }
+
+ return internal;
+ }
+
+
+ private Catalog createCatalog(Params params, ClassLoader classLoader) throws ClassNotFoundException {
+ String catalogName = params.get(IcebergCatalogParams.CATALOG_NAME);
+
+ CatalogFactory factory = createCatalogFactory(classLoader);
+ Map properties = new HashMap<>();
+
+ String catalogType = params.get(IcebergCatalogParams.CATALOG_TYPE);
+ properties.put(CATALOG_TYPE, catalogType);
+ if ("hive".equals(catalogType)) {
+ if (params.contains(IcebergCatalogParams.HIVE_CONF_DIR) && params.get(IcebergCatalogParams.HIVE_CONF_DIR) != null) {
+ String localHiveConfDir;
+ try {
+ localHiveConfDir = downloadHiveConf(
+ FilePath.deserialize(params.get(IcebergCatalogParams.HIVE_CONF_DIR))
+ );
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ properties.put(CATALOG_HIVE_CONF_DIR, localHiveConfDir);
+ } else {
+ properties.put(HIVE_URI, params.get(IcebergCatalogParams.HIVE_URI));
+ properties.put(WAREHOUSE, params.get(IcebergCatalogParams.WAREHOUSE));
+ }
+ } else {
+ properties.put(WAREHOUSE, params.get(IcebergCatalogParams.WAREHOUSE));
+ }
+
+
+ if (params.get(IcebergCatalogParams.DEFAULT_DATABASE) != null) {
+ properties.put(CATALOG_DEFAULT_DATABASE, params.get(IcebergCatalogParams.DEFAULT_DATABASE));
+ }
+
+ return factory.createCatalog(catalogName, properties);
+ }
+
+ public static String downloadHiveConf(FilePath hiveConfDir) throws IOException {
+ return downloadFolder(hiveConfDir, "hive-site.xml");
+ }
+
+ public static String downloadFolder(FilePath folder, String... files) throws IOException {
+ // local
+ if (folder.getFileSystem() instanceof LocalFileSystem) {
+ return folder.getPathStr();
+ }
+
+ File localConfDir = new File(System.getProperty("java.io.tmpdir"), FileUtils.getRandomFilename(""));
+ String scheme = folder.getPath().toUri().getScheme();
+
+ if (!localConfDir.mkdir()) {
+ throw new RuntimeException("Could not create the dir " + localConfDir.getAbsolutePath());
+ }
+
+ if (scheme != null && (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https"))) {
+ for (String path : files) {
+ try (HttpFileSplitReader reader = new HttpFileSplitReader(folder.getPathStr() + "/" + path)) {
+ long fileLen = reader.getFileLength();
+ reader.open(null, 0, fileLen);
+
+ int offset = 0;
+ byte[] buffer = new byte[1024];
+
+ try (FileOutputStream outputStream = new FileOutputStream(
+ Paths.get(localConfDir.getPath(), path).toFile())) {
+ while (offset < fileLen) {
+ int len = reader.read(buffer, offset, 1024);
+ outputStream.write(buffer, offset, len);
+ offset += len;
+ }
+ }
+
+ } catch (FileNotFoundException exception) {
+ // pass
+ }
+ }
+ } else {
+ for (String path : files) {
+ // file system
+ if (!folder.getFileSystem().exists(new Path(folder.getPath(), path))) {
+ continue;
+ }
+
+ try (FSDataInputStream inputStream = folder.getFileSystem().open(
+ new Path(folder.getPath(), path));
+ FileOutputStream outputStream = new FileOutputStream(
+ Paths.get(localConfDir.getPath(), path).toFile())) {
+ IOUtils.copy(inputStream, outputStream);
+ }
+ }
+ }
+
+ return localConfDir.getAbsolutePath();
+ }
+
+ public static CatalogFactory createCatalogFactory(ClassLoader classLoader) {
+ try {
+ return (CatalogFactory) classLoader
+ .loadClass("org.apache.flink.iceberg.FlinkCatalogFactory")
+ .getConstructor()
+ .newInstance();
+ } catch (ClassNotFoundException | NoSuchMethodException
+ | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+
+ throw new RuntimeException("Could not find the iceberg catelog factory.", e);
+ }
+ }
+
+
+ @Override
+ public void close() throws CatalogException {
+ icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().close());
+ }
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ return icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().listDatabases());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotEmptyException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listTables(String databaseName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listViews(String databaseName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return icebergClassLoaderFactory.doAsThrowRuntime(() -> loadCatalog().tableExists(tablePath));
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listPartitions(ObjectPath tablePath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listPartitionsByFilter(ObjectPath tablePath, List filters) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+ throws PartitionAlreadyExistsException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listFunctions(String dbName) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ private Tuple3, RichInputFormat> createInputFormat(
+ StreamExecutionEnvironment senv, ObjectPath objectPath, Catalog catalog, IcebergClassLoaderFactory factory) {
+
+ return factory.doAsThrowRuntime(() -> {
+ Class> inputOutputFormat = Class.forName(
+ "org.apache.flink.iceberg.InputOutputFormat",
+ true, Thread.currentThread().getContextClassLoader()
+ );
+
+ Method method = inputOutputFormat.getMethod("createInputFormat", StreamExecutionEnvironment.class, Catalog.class, ObjectPath.class);
+ Tuple3, RichInputFormat> internalRet =
+ (Tuple3, RichInputFormat>) method.invoke(null, senv, catalog, objectPath);
+
+ return Tuple3.of(internalRet.f0, internalRet.f1,
+ new RichInputFormatWithClassLoader<>(factory, internalRet.f2));
+ });
+ }
+}
diff --git a/core/src/main/java/com/alibaba/alink/common/io/catalog/plugin/IcebergClassLoaderFactory.java b/core/src/main/java/com/alibaba/alink/common/io/catalog/plugin/IcebergClassLoaderFactory.java
new file mode 100644
index 000000000..9db5b8c2f
--- /dev/null
+++ b/core/src/main/java/com/alibaba/alink/common/io/catalog/plugin/IcebergClassLoaderFactory.java
@@ -0,0 +1,67 @@
+package com.alibaba.alink.common.io.catalog.plugin;
+
+import com.alibaba.alink.common.io.plugin.ClassLoaderContainer;
+import com.alibaba.alink.common.io.plugin.ClassLoaderFactory;
+import com.alibaba.alink.common.io.plugin.PluginDescriptor;
+import com.alibaba.alink.common.io.plugin.RegisterKey;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.security.PrivilegedExceptionAction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+public class IcebergClassLoaderFactory extends ClassLoaderFactory implements Serializable {
+ private static final long serialVersionUID = 1233515335175475912L;
+
+ private final static Logger LOG = LoggerFactory.getLogger(IcebergClassLoaderFactory.class);
+ private static final String ICEBERG_DB_NAME = "iceberg";
+
+ public IcebergClassLoaderFactory(String version) {
+ super(new RegisterKey(ICEBERG_DB_NAME, version), ClassLoaderContainer.createPluginContextOnClient());
+ }
+
+ @Override
+ public T doAs(PrivilegedExceptionAction action) throws Exception {
+
+ ClassLoader classLoader = create();
+
+ try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
+ return action.run();
+ }
+ }
+
+ @Override
+ public ClassLoader create() {
+ return ClassLoaderContainer
+ .getInstance()
+ .create(
+ registerKey,
+ registerContext,
+ Factory.class,
+ new IcebergServiceFilter(),
+ new IcebergCatalogVersionGetter()
+ );
+ }
+
+ private static class IcebergServiceFilter implements Predicate {
+
+ @Override
+ public boolean test(Factory factory) {
+ return factory.getClass().getName().contains("FlinkCatalogFactory");
+ }
+ }
+
+ private static class IcebergCatalogVersionGetter implements
+ Function, String> {
+
+ @Override
+ public String apply(Tuple2 factory) {
+ return factory.f1.getVersion();
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/alibaba/alink/params/io/IcebergCatalogParams.java b/core/src/main/java/com/alibaba/alink/params/io/IcebergCatalogParams.java
new file mode 100644
index 000000000..36c9dc114
--- /dev/null
+++ b/core/src/main/java/com/alibaba/alink/params/io/IcebergCatalogParams.java
@@ -0,0 +1,75 @@
+package com.alibaba.alink.params.io;
+
+import com.alibaba.alink.params.io.shared.HasCatalogName;
+import com.alibaba.alink.params.io.shared.HasDefaultDatabase;
+import com.alibaba.alink.params.io.shared.HasPartition;
+import com.alibaba.alink.params.io.shared.HasPartitions;
+import com.alibaba.alink.params.io.shared.HasPluginVersion;
+import com.alibaba.alink.params.shared.HasOverwriteSink;
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+
+public interface IcebergCatalogParams
+ extends HasCatalogName,
+ HasDefaultDatabase,
+ HasPluginVersion,
+ HasOverwriteSink {
+
+
+ ParamInfo CATALOG_TYPE = ParamInfoFactory
+ .createParamInfo("catalog-type", String.class)
+ .setDescription("Iceberg currently support hive or hadoop catalog type")
+ .setRequired()
+ .build();
+
+ default T setCatalogType(String value) {
+ return set(CATALOG_TYPE, value);
+ }
+
+ default String getCatalogType() {
+ return get(CATALOG_TYPE);
+ }
+
+ ParamInfo WAREHOUSE = ParamInfoFactory
+ .createParamInfo("warehouse", String.class)
+ .setDescription(
+ "The Hive warehouse location.The Hive warehouse location, users should specify this path" +
+ " if neither set the hive-conf-dir to specify a location containing a hive-site.xml " +
+ "configuration file nor add a correct hive-site.xml to classpath")
+ .build();
+
+ default T setWarehouse(String value) {
+ return set(WAREHOUSE, value);
+ }
+
+ default String getWarehouse() {
+ return get(WAREHOUSE);
+ }
+
+
+ ParamInfo HIVE_CONF_DIR = ParamInfoFactory
+ .createParamInfo("hiveConfDir", String.class)
+ .setDescription("Hive configuration directory")
+ .build();
+
+ default T setHiveConfDir(String value) {
+ return set(HIVE_CONF_DIR, value);
+ }
+
+ default String getHiveConfDir() {
+ return get(HIVE_CONF_DIR);
+ }
+
+ ParamInfo HIVE_URI = ParamInfoFactory
+ .createParamInfo("uri", String.class)
+ .setDescription("Hive metastore uri ")
+ .build();
+
+ default T setHiveUri(String value) {
+ return set(HIVE_URI, value);
+ }
+
+ default String getHiveUri() {
+ return get(HIVE_URI);
+ }
+}