Skip to content

Commit

Permalink
add iceberg support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjun0x01 committed Oct 22, 2021
1 parent c1cd338 commit 2f52c66
Show file tree
Hide file tree
Showing 7 changed files with 943 additions and 0 deletions.
209 changes: 209 additions & 0 deletions connectors/connector-iceberg/iceberg-bridge/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>alink_connectors_iceberg</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>alink_iceberg_bridge_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
<packaging>jar</packaging>
<name>alink-iceberg-bridge</name>

<properties>
<hivemetastore.hadoop.version>2.7.5</hivemetastore.hadoop.version>
<flink.shaded.version>9.0</flink.shaded.version>
<hive.version>2.3.4</hive.version>
<iceberg.version>0.12.0</iceberg.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>${hivemetastore.hadoop.version}-${flink.shaded.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-vector-code-gen</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>${iceberg.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.iceberg.flink;

import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;

import java.io.IOException;
import java.io.UncheckedIOException;

public class InputOutputFormat {

public static Tuple2<TypeInformation<RowData>, RichInputFormat<RowData, ?>> createInputFormat(
StreamExecutionEnvironment execEnv, Catalog catalog, ObjectPath objectPath) {

if (!(catalog instanceof FlinkCatalog)) {
throw new RuntimeException("Catalog should be iceberg catalog.");
}

TableLoader tableLoader = createTableLoader((FlinkCatalog) catalog, objectPath);

Table table;
Schema icebergSchema;
tableLoader.open();
try (TableLoader loader = tableLoader) {
table = loader.loadTable();
icebergSchema = table.schema();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(icebergSchema));
FlinkInputFormat flinkInputFormat = FlinkSource.forRowData()
.env(execEnv)
.tableLoader(tableLoader)
.table(table)
.buildFormat();
return Tuple2.of(typeInfo, flinkInputFormat);
}

private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
}
}
20 changes: 20 additions & 0 deletions connectors/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>alink_connectors_iceberg</artifactId>
<packaging>pom</packaging>
<name>alink-connector-iceberg</name>

<modules>
<module>iceberg-bridge</module>
</modules>
</project>
1 change: 1 addition & 0 deletions connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<module>connector-odps</module>
<module>connector-jdbc</module>
<module>connector-hive</module>
<module>connector-iceberg</module>
<module>filesystem</module>
</modules>

Expand Down
Loading

0 comments on commit 2f52c66

Please sign in to comment.