From a815dbdb667a11aa4f33915b929b50be8bb21556 Mon Sep 17 00:00:00 2001 From: sxnan Date: Mon, 26 Jun 2023 11:30:02 +0800 Subject: [PATCH] Add kubernetes application example --- .github/workflows/build-and-test.yaml | 5 + flink-kubernetes-application/Dockerfile | 24 ++++ flink-kubernetes-application/README.md | 116 ++++++++++++++++ .../data/expected_output.txt | 5 + .../data/item_price_events.json | 6 + .../data/purchase_events.json | 5 + flink-kubernetes-application/main.py | 127 ++++++++++++++++++ .../pod-template.yaml | 13 ++ .../run_and_verify.sh | 46 +++++++ 9 files changed, 347 insertions(+) create mode 100644 flink-kubernetes-application/Dockerfile create mode 100644 flink-kubernetes-application/README.md create mode 100644 flink-kubernetes-application/data/expected_output.txt create mode 100644 flink-kubernetes-application/data/item_price_events.json create mode 100644 flink-kubernetes-application/data/purchase_events.json create mode 100644 flink-kubernetes-application/main.py create mode 100644 flink-kubernetes-application/pod-template.yaml create mode 100755 flink-kubernetes-application/run_and_verify.sh diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml index 8646255..8ea44dc 100644 --- a/.github/workflows/build-and-test.yaml +++ b/.github/workflows/build-and-test.yaml @@ -22,6 +22,11 @@ jobs: python_version: ["3.7", "3.8", "3.9"] steps: - uses: actions/checkout@v3 + - name: start mimnikube + uses: medyagh/setup-minikube@latest + with: + cpus: 2 + memory: 4000m - name: Set up python ${{ inputs.python_version }} uses: actions/setup-python@v4 with: diff --git a/flink-kubernetes-application/Dockerfile b/flink-kubernetes-application/Dockerfile new file mode 100644 index 0000000..df75d01 --- /dev/null +++ b/flink-kubernetes-application/Dockerfile @@ -0,0 +1,24 @@ +FROM flink:1.16.2 + +# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source +# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially. +RUN apt-get update -y && \ + apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev && \ + wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ + tar -xvf Python-3.7.9.tgz && \ + cd Python-3.7.9 && \ + ./configure --without-tests --enable-shared && \ + make -j6 && \ + make install && \ + ldconfig /usr/local/lib && \ + cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \ + ln -s /usr/local/bin/python3 /usr/local/bin/python && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# install PyFlink +RUN pip3 install apache-flink==1.16.2 feathub-nightly[flink] + +COPY main.py /main.py + +COPY data /tmp/data diff --git a/flink-kubernetes-application/README.md b/flink-kubernetes-application/README.md new file mode 100644 index 0000000..3d180c7 --- /dev/null +++ b/flink-kubernetes-application/README.md @@ -0,0 +1,116 @@ +# Overview + +This example shows how to use `DerivedFeatureView` to backfill the input dataset +with extra features for offline training. It involves the following steps: + +1. Read a batch of historical purchase events from a file. + + Each purchase event has the following fields: + - user_id, unique identifier of the user that made the purchase. + - item_id, unique identifier of the item that is purchased. + - item_count, number of items purchased. + - timestamp, time when this purchase is made. + +2. Read a batch of historical item price events from a file. + + Each item price event has the following fields: + - item_id, unique identifier of the item. + - price, the new price of this item. + - timestamp, time when the new price is used for this item. + +3. For each purchase event, append the following two fields by joining with item + price events and performing over-window aggregation, with point-in-time + correctness in both operations. + + - price, price of the item at the time this purchase is made. + - total_payment_last_two_minutes, total cost of purchases made by this + user in a 2-minute window that ends at the time this purchase is made. + +4. Output the batch of purchase events backfilled with the extra features to a + file. + +The example demonstrate how to submit the Feathub job to a Kubernetes cluster in Flink +native Kubernetes application mode. + +# Prerequisites + +Prerequisites for running this example: +- Unix-like operating system (e.g. Linux, Mac OS X) +- Python 3.7 +- Minikube 1.30.1 +- kubectl 1.25.9 + +# Step-By-Step Instructions + +Please execute the following commands under the `flink-kubernetes-application` +folder to run this example. + +1. Start the Minikube + + ```bash + $ minikube start + ``` + + After the Minikube started, you can run `kubectl get ns` to see the namespace in + the cluster. + +2. Build the Flink image + + Flink Kubernetes application mode requires that the user code is bundle together with + Flink image. And we need to install Feathub in the image. You can run the following + command to build the image to be used by Minikube. + + ```bash + eval $(minikube docker-env) + docker build -q --rm -t flink-k8s-app . + ``` + +3. Submit the Feathub job to Kubernetes cluster + + ```bash + # Create the output directory in the Minikube. + $ minikube ssh -- 'mkdir -p -m=777 /tmp/flink-kubernetes-application/output' + + # Grant the default service account with permission to create, delete pods. + $ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default + + $ curl -LO https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz + $ tar -xzf flink-1.16.2-bin-scala_2.12.tgz + $ ./flink-1.16.2/bin/flink run-application \ + --target kubernetes-application \ + -Dkubernetes.container.image=flink-k8s-app:latest \ + -Dkubernetes.pod-template-file=./pod-template.yaml \ + -py /main.py + ``` + + Once the job is submitted, you can list the pod that runs the Flink JobManager and + check out the log. + + ```bash + # List the running pod. + $ kubectl get pod + + $ kubectl logs + ``` + +4. Checkout the outputs. + + ```bash + $ minikube ssh -- 'cat /tmp/flink-kubernetes-application/output/output.json/*' + ``` + + The file should contain the following rows: + + ``` + user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0 + user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0 + user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0 + user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0 + user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0 + ``` + +5. Tear down the Minikube. + + ```bash + minikube stop + ``` diff --git a/flink-kubernetes-application/data/expected_output.txt b/flink-kubernetes-application/data/expected_output.txt new file mode 100644 index 0000000..7fd8a21 --- /dev/null +++ b/flink-kubernetes-application/data/expected_output.txt @@ -0,0 +1,5 @@ +user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0 +user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0 +user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0 +user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0 +user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0 \ No newline at end of file diff --git a/flink-kubernetes-application/data/item_price_events.json b/flink-kubernetes-application/data/item_price_events.json new file mode 100644 index 0000000..489cfd6 --- /dev/null +++ b/flink-kubernetes-application/data/item_price_events.json @@ -0,0 +1,6 @@ +{"item_id":"item_1", "price":100.0, "timestamp":"2022-01-01 00:00:00"} +{"item_id":"item_2", "price":200.0, "timestamp":"2022-01-01 00:00:00"} +{"item_id":"item_3", "price":300.0, "timestamp":"2022-01-01 00:00:00"} +{"item_id":"item_1", "price":200.0, "timestamp":"2022-01-01 00:01:30"} +{"item_id":"item_1", "price":300.0, "timestamp":"2022-01-01 00:02:30"} +{"item_id":"item_1", "price":400.0, "timestamp":"2022-01-01 00:03:30"} diff --git a/flink-kubernetes-application/data/purchase_events.json b/flink-kubernetes-application/data/purchase_events.json new file mode 100644 index 0000000..cdedf8f --- /dev/null +++ b/flink-kubernetes-application/data/purchase_events.json @@ -0,0 +1,5 @@ +{"user_id":"user_1", "item_id":"item_1", "item_count":1, "timestamp":"2022-01-01 00:00:00"} +{"user_id":"user_1", "item_id":"item_2", "item_count":2, "timestamp":"2022-01-01 00:01:00"} +{"user_id":"user_1", "item_id":"item_1", "item_count":3, "timestamp":"2022-01-01 00:02:00"} +{"user_id":"user_2", "item_id":"item_1", "item_count":1, "timestamp":"2022-01-01 00:03:00"} +{"user_id":"user_1", "item_id":"item_3", "item_count":2, "timestamp":"2022-01-01 00:04:00"} \ No newline at end of file diff --git a/flink-kubernetes-application/main.py b/flink-kubernetes-application/main.py new file mode 100644 index 0000000..6d59169 --- /dev/null +++ b/flink-kubernetes-application/main.py @@ -0,0 +1,127 @@ +# Copyright 2022 The FeatHub Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta + +from feathub.feathub_client import FeathubClient +from feathub.feature_tables.sinks.file_system_sink import FileSystemSink +from feathub.feature_views.feature import Feature +from feathub.feature_views.derived_feature_view import DerivedFeatureView +from feathub.feature_views.transforms.over_window_transform import ( + OverWindowTransform, +) + +from feathub.common import types +from feathub.feature_tables.sources.file_system_source import FileSystemSource +from feathub.table.schema import Schema + +if __name__ == "__main__": + client = FeathubClient( + props={ + "processor": { + "type": "flink", + "flink": { + "deployment_mode": "cli", + }, + }, + "online_store": { + "types": ["memory"], + "memory": {}, + }, + "registry": { + "type": "local", + "local": { + "namespace": "default", + }, + }, + "feature_service": { + "type": "local", + "local": {}, + }, + } + ) + + purchase_events_schema = ( + Schema.new_builder() + .column("user_id", types.String) + .column("item_id", types.String) + .column("item_count", types.Int32) + .column("timestamp", types.String) + .build() + ) + + purchase_events_source = FileSystemSource( + name="purchase_events", + path="/tmp/data/purchase_events.json", + data_format="json", + schema=purchase_events_schema, + timestamp_field="timestamp", + timestamp_format="%Y-%m-%d %H:%M:%S", + ) + + item_price_events_schema = ( + Schema.new_builder() + .column("item_id", types.String) + .column("price", types.Float32) + .column("timestamp", types.String) + .build() + ) + + item_price_events_source = FileSystemSource( + name="item_price_events", + path="/tmp/data/item_price_events.json", + data_format="json", + schema=item_price_events_schema, + keys=["item_id"], + timestamp_field="timestamp", + timestamp_format="%Y-%m-%d %H:%M:%S", + ) + + # The total cost of purchases made by this user in the last 2 minutes. + f_total_payment_last_two_minutes = Feature( + name="total_payment_last_two_minutes", + transform=OverWindowTransform( + expr="item_count * price", + agg_func="SUM", + window_size=timedelta(minutes=2), + group_by_keys=["user_id"], + ), + ) + + purchase_events_with_features = DerivedFeatureView( + name="purchase_events_with_features", + source=purchase_events_source, + features=[ + "item_price_events.price", + f_total_payment_last_two_minutes, + ], + keep_source_fields=True, + ) + + client.build_features( + [ + item_price_events_source, + purchase_events_with_features, + ] + ) + + result_table = client.get_features(purchase_events_with_features) + + result_table_df = result_table.to_pandas() + + print(result_table_df) + + local_sink = FileSystemSink(path="/tmp/data/output/output.json", data_format="csv") + + result_table.execute_insert(sink=local_sink, allow_overwrite=True).wait() diff --git a/flink-kubernetes-application/pod-template.yaml b/flink-kubernetes-application/pod-template.yaml new file mode 100644 index 0000000..a57f325 --- /dev/null +++ b/flink-kubernetes-application/pod-template.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Pod +spec: + containers: + - name: flink-main-container + volumeMounts: + - mountPath: /tmp/data/output + name: flink-volume-hostpath + volumes: + - name: flink-volume-hostpath + hostPath: + path: /tmp/flink-kubernetes-application/output + type: DirectoryOrCreate \ No newline at end of file diff --git a/flink-kubernetes-application/run_and_verify.sh b/flink-kubernetes-application/run_and_verify.sh new file mode 100755 index 0000000..928f553 --- /dev/null +++ b/flink-kubernetes-application/run_and_verify.sh @@ -0,0 +1,46 @@ +# +# Copyright 2022 The FeatHub Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e + +cd "$(dirname "$0")" +PROJECT_DIR=$(cd "$(pwd)/.."; pwd) +source "${PROJECT_DIR}"/tools/utils.sh + +eval $(minikube docker-env) +docker build -q --rm -t flink-k8s-app . +minikube ssh -- 'mkdir -p -m=777 /tmp/flink-kubernetes-application/output' +kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default + +curl -LO https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz +tar -xzf flink-1.16.2-bin-scala_2.12.tgz +./flink-1.16.2/bin/flink run-application \ + --target kubernetes-application \ + -Dkubernetes.container.image=flink-k8s-app:latest \ + -Dkubernetes.pod-template-file=./pod-template.yaml \ + -Dkubernetes.jobmanager.cpu=0.25 \ + -Dkubernetes.taskmanager.cpu=0.25 \ + -Djobmanager.memory.process.size=1G \ + -Dtaskmanager.memory.process.size=1G \ + -py /main.py + +POD_NAME=$(kubectl get po --no-headers=true | awk '{print $1}') +kubectl wait pods --for=condition=Ready "${POD_NAME}" +kubectl logs -f "${POD_NAME}" + +minikube ssh --native-ssh=false -- 'cat /tmp/flink-kubernetes-application/output/output.json/*' > data/merged_output +sort_and_compare_files data/merged_output data/expected_output.txt +