Skip to content

Commit

Permalink
Add Delta Lake example (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff authored Jul 12, 2024
1 parent 14955dd commit 4dcad26
Show file tree
Hide file tree
Showing 10 changed files with 643 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This repository contains examples of use cases that utilize Decodable streaming
|[Array Aggregation](array-agg)| Using the `array_agg()` UDF for denormalizing data in a pipeline from MySQL to OpenSearch |
|[Kafka with ngrok](kafka-ngrok)| Docker Compose for running Apache Kafka locally, accessible from the internet using ngrok|
|[PyFlink on Decodable](pyflink-decodable)| Running a PyFlink job as a Custom Pipeline on Decodable|
|[Delta Lake / Flink](flink-delta-lake)| Writing to Delta Lake with Apache Flink |


## License
Expand Down
9 changes: 9 additions & 0 deletions flink-delta-lake/delta-flink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory');

CREATE DATABASE c_delta.db_new;

CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ( 'connector' = 'delta', 'table-path' = 's3a://warehouse/t_foo');

INSERT INTO c_delta.db_new.t_foo
SELECT name, 42
FROM (VALUES ('Never'), ('Gonna'), ('Give'), ('You'), ('Up')) AS NameTable(name);
130 changes: 130 additions & 0 deletions flink-delta-lake/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
services:
jobmanager:
build: ./flink
hostname: jobmanager
container_name: jobmanager
ports:
- "8081:8081"
command: jobmanager
volumes:
- .:/data/
environment:
- |
FLINK_PROPERTIES=
flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://minio:9000
flink.hadoop.fs.s3a.path.style.access: true
fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://minio:9000
fs.s3a.path.style.access: true
jobmanager.rpc.address: jobmanager
rest.flamegraph.enabled: true
taskmanager:
build: ./flink
hostname: taskmanager
depends_on:
- jobmanager
command: taskmanager
deploy:
replicas: 2
environment:
- |
FLINK_PROPERTIES=
flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://minio:9000
flink.hadoop.fs.s3a.path.style.access: true
fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://minio:9000
fs.s3a.path.style.access: true
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
zookeeper:
image: confluentinc/cp-zookeeper:7.5.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.5.1
container_name: broker
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: DOCKER://broker:29092, LOCALHOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: DOCKER://broker:29092, LOCALHOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,LOCALHOST:PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092

minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]

mc:
depends_on:
- minio
image: minio/mc
container_name: mc
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
tail -f /dev/null
"
hive-metastore:
container_name: hms
build: ./hms-standalone-s3
ports:
- "9083:9083"
environment:
- HMS_LOGLEVEL=INFO

duckdb:
image: davidgasquez/duckdb
container_name: duckdb
restart: no
entrypoint: tail -f /dev/null

kcat:
image: edenhill/kcat:1.7.1
container_name: kcat
restart: no
entrypoint: tail -f /dev/null

shadowtraffic:
# watch 'docker exec shadowtraffic curl -s localhost:9400/metrics |grep events_sent'
image: shadowtraffic/shadowtraffic:0.6.0
container_name: shadowtraffic
# profiles: ["shadowtraffic"]
env_file:
- shadowtraffic/license.env
volumes:
- ./shadowtraffic:/data
command: --config /data/kafka-retail.json

# Without a network explicitly defined, you hit this Hive/Thrift error
# java.net.URISyntaxException Illegal character in hostname
# https://github.com/TrivadisPF/platys-modern-data-platform/issues/231
networks:
default:
name: zaphod
100 changes: 100 additions & 0 deletions flink-delta-lake/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
FROM apache/flink:1.18.1-scala_2.12-java11
SHELL ["/bin/bash", "-c"]

# Install some useful tools
RUN apt-get update && \
apt-get install -y neovim tree lnav unzip && \
apt-get purge -y --auto-remove && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

RUN wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-amd64.zip \
&& unzip duckdb_cli-linux-amd64.zip -d /usr/local/bin \
&& rm duckdb_cli-linux-amd64.zip

USER flink
WORKDIR /opt/flink

COPY --chown=flink conf/hive-site.xml ./conf/hive-site.xml
# COPY --chown=flink conf/log4j.properties ./conf/log4j-console.properties

# Enable SQL Client to find the job manager when running it from this image
RUN sed -i "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: flink-jobmanager/g" ./conf/flink-conf.yaml

# # Enable this for debug logging
# RUN cat >> ./conf/log4j.properties <<EOF
# logger.fs.name = org.apache.hadoop.fs
# logger.fs.level = TRACE
# logger.fs2.name = org.apache.flink.fs
# logger.fs2.level = TRACE
# logger.aws.name = com.amazonaws
# logger.aws.level = TRACE
# logger.delta.name = io.delta
# logger.delta.level =TRACE
# EOF

# RUN cat >> ./conf/log4j-cli.properties <<EOF
# logger.fs.name = org.apache.hadoop.fs
# logger.fs.level = TRACE
# logger.fs2.name = org.apache.flink.fs
# logger.fs2.level = TRACE
# logger.aws.name = com.amazonaws
# logger.aws.level = TRACE
# logger.delta.name = io.delta
# logger.delta.level =TRACE
# EOF

# Install JARs
# Create necessary directories
RUN mkdir -p ./lib/delta ./lib/kafka ./lib/hive ./lib/hadoop

RUN echo "Add Flink S3 Plugin" && \
mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

# Download and Install JARs

RUN echo "-> Install JARs: Flink's Kafka connector" && \
mkdir -p ./lib/kafka && pushd $_ && \
curl https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar -O && \
popd

RUN echo "-> Install JARs: Flink's Hive connector" && \
mkdir -p ./lib/hive && pushd $_ && \
curl https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar -O && \
popd

RUN echo "-> Install JARs: Dependencies for Delta Lake" && \
mkdir -p ./lib/delta && pushd $_ && \
curl https://repo1.maven.org/maven2/io/delta/delta-flink/3.2.0/delta-flink-3.2.0.jar -O && \
curl https://repo1.maven.org/maven2/io/delta/delta-standalone_2.12/3.2.0/delta-standalone_2.12-3.2.0.jar -O && \
curl https://repo1.maven.org/maven2/io/delta/delta-storage/3.2.0/delta-storage-3.2.0.jar -O && \
curl https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet/1.18.1/flink-sql-parquet-1.18.1.jar -O && \
curl https://repo1.maven.org/maven2/com/chuusai/shapeless_2.12/2.3.4/shapeless_2.12-2.3.4.jar -O && \
popd

RUN echo "-> Install JARs: AWS / Hadoop S3" && \
mkdir -p ./lib/aws && pushd $_ && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -O && \
curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.648/aws-java-sdk-bundle-1.12.648.jar -O && \
popd

RUN echo "-> Install JARs: Hadoop" && \
mkdir -p ./lib/hadoop && pushd $_ && \
curl https://repo1.maven.org/maven2/com/google/guava/guava/27.0-jre/guava-27.0-jre.jar -O && \
curl https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.12.7/jackson-databind-2.12.7.jar -O && \
curl https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.17.1/jackson-core-2.17.1.jar -O && \
curl https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.12.7/jackson-annotations-2.12.7.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.1.1/commons-configuration2-2.1.1.jar -O && \
curl https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/3.3.4/hadoop-auth-3.3.4.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.4/hadoop-common-3.3.4.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/thirdparty/hadoop-shaded-guava/1.1.1/hadoop-shaded-guava-1.1.1.jar -O && \
curl https://repo1.maven.org/maven2/org/codehaus/woodstox/stax2-api/4.2.1/stax2-api-4.2.1.jar -O && \
curl https://repo1.maven.org/maven2/com/fasterxml/woodstox/woodstox-core/5.3.0/woodstox-core-5.3.0.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.3.4/hadoop-hdfs-client-3.3.4.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.3.4/hadoop-mapreduce-client-core-3.3.4.jar -O && \
popd

# Set the launch command
CMD ./bin/start-cluster.sh && sleep infinity
34 changes: 34 additions & 0 deletions flink-delta-lake/flink/conf/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>

<property>
<name>hive.metastore.uris</name>
<value>thrift://hms:9083</value>
</property>

<!-- <property>
<name>fs.s3a.access.key</name>
<value>admin</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>password</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>http://minio:9000</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property> -->

</configuration>
14 changes: 14 additions & 0 deletions flink-delta-lake/hms-standalone-s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM hms-standalone

RUN apt-get update && apt-get install -y curl rlwrap vim

RUN cd /opt/hive-metastore/lib && \
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -O && \
curl https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar -O && \
curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.648/aws-java-sdk-bundle-1.12.648.jar -O

COPY conf/hive-site.xml /opt/hive-metastore/conf/hive-site.xml

RUN cd ~ && \
curl https://archive.apache.org/dist/db/derby/db-derby-10.14.2.0/db-derby-10.14.2.0-bin.tar.gz -o db-derby-10.14.2.0-bin.tar.gz && \
tar xf db-derby-10.14.2.0-bin.tar.gz
65 changes: 65 additions & 0 deletions flink-delta-lake/hms-standalone-s3/conf/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- These are default values meant to allow easy smoke testing of the metastore. You will
likely need to add a number of new values. -->
<configuration>
<property>
<name>metastore.thrift.uris</name>
<value>thrift://hms:9083</value>
</property>
<!-- Add Materialization stuff for standalone metastore -->
<property>
<name>metastore.task.threads.always</name>
<value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
</property>
<property>
<name>metastore.expression.proxy</name>
<value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
</property>

<!-- Derby embedded DB-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=/tmp/metastore_db;create=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.apache.derby.jdbc.EmbeddedDriver</value>
</property>

<property>
<name>fs.s3a.access.key</name>
<value>admin</value>
</property>

<property>
<name>fs.s3a.secret.key</name>
<value>password</value>
</property>

<property>
<name>fs.s3a.endpoint</name>
<value>http://minio:9000</value>
</property>

<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>

</configuration>
Loading

0 comments on commit 4dcad26

Please sign in to comment.