From 40f274be69fd1e9995b550cbc397663b0f1cc1aa Mon Sep 17 00:00:00 2001 From: Xuannan Date: Thu, 21 Sep 2023 21:32:11 +0800 Subject: [PATCH] Update flink-yarn-application example to use multiple python files This closes #38. --- flink-yarn-application/README.md | 14 ++++++------ flink-yarn-application/{ => code}/main.py | 9 ++++++++ flink-yarn-application/code/transforms.py | 22 +++++++++++++++++++ .../data/expected_output.txt | 10 ++++----- flink-yarn-application/run_and_verify.sh | 4 ++-- 5 files changed, 45 insertions(+), 14 deletions(-) rename flink-yarn-application/{ => code}/main.py (94%) create mode 100644 flink-yarn-application/code/transforms.py diff --git a/flink-yarn-application/README.md b/flink-yarn-application/README.md index fa2af3b..ffc1178 100644 --- a/flink-yarn-application/README.md +++ b/flink-yarn-application/README.md @@ -74,7 +74,7 @@ folder to run this example. $ ./hadoop-3.3.6/bin/hdfs dfs -put venv.zip /venv.zip $ rm -rf venv venv.zip $ ./hadoop-3.3.6/bin/hdfs dfs -put data /data - $ ./hadoop-3.3.6/bin/hdfs dfs -put main.py /main.py + $ ./hadoop-3.3.6/bin/hdfs dfs -put code /code ``` 3. Submit the Feathub job to Yarn cluster @@ -89,7 +89,7 @@ folder to run this example. -pyarch hdfs:///venv.zip \ -pyclientexec venv.zip/venv/bin/python3 \ -pyexec venv.zip/venv/bin/python3 \ - -pyfs hdfs:///main.py \ + -pyfs hdfs:///code \ -pym main ``` @@ -108,11 +108,11 @@ folder to run this example. The file should contain the following rows: ``` - user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0 - user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0 - user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0 - user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0 - user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0 + user_1,item_1,1,"2022-01-01 00:00:00",user_1item_1,100.0,100.0 + user_1,item_2,2,"2022-01-01 00:01:00",user_1item_2,200.0,500.0 + user_1,item_1,3,"2022-01-01 00:02:00",user_1item_1,200.0,1100.0 + user_2,item_1,1,"2022-01-01 00:03:00",user_2item_1,300.0,300.0 + user_1,item_3,2,"2022-01-01 00:04:00",user_1item_3,300.0,1200.0 ``` 5. Tear down the Yarn cluster. diff --git a/flink-yarn-application/main.py b/flink-yarn-application/code/main.py similarity index 94% rename from flink-yarn-application/main.py rename to flink-yarn-application/code/main.py index 5406bbb..d9fe076 100644 --- a/flink-yarn-application/main.py +++ b/flink-yarn-application/code/main.py @@ -26,6 +26,8 @@ from feathub.feature_tables.sources.file_system_source import FileSystemSource from feathub.table.schema import Schema +from transforms import concat_user_id_item_id_transform + if __name__ == "__main__": client = FeathubClient( props={ @@ -99,10 +101,17 @@ ), ) + f_user_id_item_id = Feature( + name="user_id_item_id", + transform=concat_user_id_item_id_transform, + dtype=types.String, + ) + purchase_events_with_features = DerivedFeatureView( name="purchase_events_with_features", source=purchase_events_source, features=[ + f_user_id_item_id, "item_price_events.price", f_total_payment_last_two_minutes, ], diff --git a/flink-yarn-application/code/transforms.py b/flink-yarn-application/code/transforms.py new file mode 100644 index 0000000..b8ddfb0 --- /dev/null +++ b/flink-yarn-application/code/transforms.py @@ -0,0 +1,22 @@ +# Copyright 2022 The FeatHub Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from feathub.feature_views.transforms.python_udf_transform import PythonUdfTransform +from pandas import Series + + +def concat_user_id_item_id(row: Series) -> str: + return row["user_id"] + row["item_id"] + + +concat_user_id_item_id_transform = PythonUdfTransform(concat_user_id_item_id) diff --git a/flink-yarn-application/data/expected_output.txt b/flink-yarn-application/data/expected_output.txt index 7fd8a21..63809c3 100644 --- a/flink-yarn-application/data/expected_output.txt +++ b/flink-yarn-application/data/expected_output.txt @@ -1,5 +1,5 @@ -user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0 -user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0 -user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0 -user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0 -user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0 \ No newline at end of file +user_1,item_1,1,"2022-01-01 00:00:00",user_1item_1,100.0,100.0 +user_1,item_2,2,"2022-01-01 00:01:00",user_1item_2,200.0,500.0 +user_1,item_1,3,"2022-01-01 00:02:00",user_1item_1,200.0,1100.0 +user_2,item_1,1,"2022-01-01 00:03:00",user_2item_1,300.0,300.0 +user_1,item_3,2,"2022-01-01 00:04:00",user_1item_3,300.0,1200.0 \ No newline at end of file diff --git a/flink-yarn-application/run_and_verify.sh b/flink-yarn-application/run_and_verify.sh index a0f1926..575ed5e 100755 --- a/flink-yarn-application/run_and_verify.sh +++ b/flink-yarn-application/run_and_verify.sh @@ -48,7 +48,7 @@ zip -q -r venv.zip venv ./hadoop-3.3.6/bin/hdfs dfs -put venv.zip /venv.zip rm -rf venv venv.zip ./hadoop-3.3.6/bin/hdfs dfs -put data /data -./hadoop-3.3.6/bin/hdfs dfs -put main.py /main.py +./hadoop-3.3.6/bin/hdfs dfs -put code /code echo Submitting Feathub job to Yarn cluster curl -LO https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz @@ -61,7 +61,7 @@ HADOOP_CLASSPATH=$(hadoop-3.3.6/bin/hadoop classpath) \ -pyarch hdfs:///venv.zip \ -pyclientexec venv.zip/venv/bin/python3 \ -pyexec venv.zip/venv/bin/python3 \ - -pyfs hdfs:///main.py \ + -pyfs hdfs:///code \ -pym main APPLICATION_ID=$(./hadoop-3.3.6/bin/yarn app -list | grep feathub_job | awk '{print $1}')