From 95ff6e91a0014d1ba21413cc326391f810c297d2 Mon Sep 17 00:00:00 2001 From: Adrian Kreuziger Date: Tue, 12 Mar 2024 21:04:45 -0700 Subject: [PATCH] Add sample PySpark job --- .gitignore | 3 +- pyspark/README.md | 36 ++++++++++++++ pyspark/docker/docker-compose.yml | 12 +++++ pyspark/docker/init-hive.sh | 36 ++++++++++++++ pyspark/docker/seed_hive.txt | 79 +++++++++++++++++++++++++++++++ pyspark/helpers.py | 19 ++++++++ pyspark/job.py | 22 +++++++++ pyspark/requirements.txt | 5 ++ pyspark/script.py | 57 ++++++++++++++++++++++ 9 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 pyspark/README.md create mode 100644 pyspark/docker/docker-compose.yml create mode 100755 pyspark/docker/init-hive.sh create mode 100644 pyspark/docker/seed_hive.txt create mode 100644 pyspark/helpers.py create mode 100644 pyspark/job.py create mode 100644 pyspark/requirements.txt create mode 100644 pyspark/script.py diff --git a/.gitignore b/.gitignore index 3bcd366..796db6b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ gable_tutorial -__pycache__ \ No newline at end of file +__pycache__ +.venv \ No newline at end of file diff --git a/pyspark/README.md b/pyspark/README.md new file mode 100644 index 0000000..d8a9ea4 --- /dev/null +++ b/pyspark/README.md @@ -0,0 +1,36 @@ +# PySpark + +## Setup + +From the `pyspark` folder + +1. Start the hive container, which is automatically seeded with example table schemas + + ```bash + docker-compose -f ./docker/docker-compose.yml up -d + ``` + +2. Create and activate a virtual environment, install dependencies + + ```bash + python3 -m venv ".venv" + source ".venv/bin/activate" + pip3 install --pre -r requirements.txt + ``` + +## Register PySpark Job Output Tables + +To register the PySpark job's output tables & their schemas, run the following command + +```bash +gable data-asset register --source-type pyspark \ + --project-root . \ + --spark-job-entrypoint "job.py --final_output_table pnw_bookings_30_days" \ + --connection-string hive://localhost:10000 +``` + +`--project-root`: The path to the root of the Python project containing the PySpark job to run + +`--spark-job-entrypoint`: The name of the entrypoint script for the PySpark job, as well as any arguments needed to run the job. If your Spar job uses config value from `SparkConf`, you can set the config values using the [normal Spark syntax](https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties) of `--conf spark.my_config_key=config_value`. + +`--connection-string`: The [SQLAlchemy connection string](https://pypi.org/project/PyHive/) to connect to your Hive instance. Knowing the schemas of the SparkJob's input tables is required to compute the job's final output schemas. diff --git a/pyspark/docker/docker-compose.yml b/pyspark/docker/docker-compose.yml new file mode 100644 index 0000000..3d4b067 --- /dev/null +++ b/pyspark/docker/docker-compose.yml @@ -0,0 +1,12 @@ +version: '3' +services: + hive: + image: apache/hive:4.0.0-beta-1 + container_name: hive + ports: + - "10000:10000" + environment: + - SERVICE_NAME=hiveserver2 + volumes: + - ./:/opt/hive-scripts/ + entrypoint: /opt/hive-scripts/init-hive.sh \ No newline at end of file diff --git a/pyspark/docker/init-hive.sh b/pyspark/docker/init-hive.sh new file mode 100755 index 0000000..d4c196d --- /dev/null +++ b/pyspark/docker/init-hive.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +echo "Starting Hive" +pkill -f /opt/hive/bin/hive || true +/entrypoint.sh & +echo "Hive initialization was successful." + +echo "Waiting for Hive server to start..." +SUCCESS=0 +ATTEMPTS=0 +MAX_ATTEMPTS=30 + +while [ $SUCCESS -eq 0 -a $ATTEMPTS -lt $MAX_ATTEMPTS ]; do + echo "Attempting to connect to Hive server (Attempt $((ATTEMPTS+1))/$MAX_ATTEMPTS)..." + if echo "SHOW DATABASES;" | beeline -u jdbc:hive2://localhost:10000 > /dev/null 2>&1; then + SUCCESS=1 + echo "Hive server is up and running." + else + ATTEMPTS=$((ATTEMPTS+1)) + sleep 2 + fi +done + +if [ $SUCCESS -eq 0 ]; then + echo "Failed to connect to Hive server after $MAX_ATTEMPTS attempts." + exit 1 +fi + +echo "Seeding Hive database..." + +beeline -u jdbc:hive2://localhost:10000 -n root -p '' -f /opt/hive-scripts/seed_hive.txt + +echo "Database seeded." + +# Keep the container running until stopped +tail -f /dev/null \ No newline at end of file diff --git a/pyspark/docker/seed_hive.txt b/pyspark/docker/seed_hive.txt new file mode 100644 index 0000000..17128bf --- /dev/null +++ b/pyspark/docker/seed_hive.txt @@ -0,0 +1,79 @@ +CREATE TABLE IF NOT EXISTS area ( + subcity_name STRING, + area_name STRING, + area_id STRING, + geohash STRING, + city_id STRING +); + +CREATE TABLE IF NOT EXISTS city ( + latitude DOUBLE, + windows_time_zone STRING, + territory STRING, + is_capital BOOLEAN, + city_code STRING, + region STRING, + longitude DOUBLE, + created_at TIMESTAMP, + city_name STRING, + city_id STRING, + updated_at TIMESTAMP, + country_id STRING, + parent_city_id STRING, + enable_tip BOOLEAN, + time_zone STRING +); + + +CREATE TABLE IF NOT EXISTS enterprise_metrics ( + order_id STRING, + date_id STRING, + booking_code STRING, + booking_channel STRING, + last_group_id STRING, + array_enterprise_reward_id STRING, + product STRING +); + +CREATE TABLE IF NOT EXISTS booking ( + city_id STRING, + earning_adjustment_commission_rate DOUBLE, + booking_broadcast_time TIMESTAMP, + receipt_payment_type STRING, + confirm_time TIMESTAMP, + date_id STRING, + reward_id BIGINT +); + +CREATE TABLE IF NOT EXISTS generic_metrics ( +driver_longitude DOUBLE, + payment_type_id STRING, + confirm_time TIMESTAMP, + ignored_candidates STRING, + enterprise_campaign_code STRING, + cancel_types_used STRING, + is_soft_allocated BOOLEAN, + ar_denominator STRING, + actual_distance_of_trip DOUBLE, + sender_type STRING, + cancellation_fees DOUBLE, + city_id STRING, + created_at_utc TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS orders ( + merchant_order_id STRING, + total_cancels_for_order BIGINT, + pickup_time_to TIMESTAMP, + dropoff_longitude DOUBLE, + is_gpc BOOLEAN, + user_type STRING, + regular_delivery_provider STRING, + city_id STRING, + updated_at TIMESTAMP, + first_allocation_timestamp TIMESTAMP, + cancelled_time TIMESTAMP, + cancel_types_used STRING, + pending_dropoff_time TIMESTAMP, + cod_type STRING +); diff --git a/pyspark/helpers.py b/pyspark/helpers.py new file mode 100644 index 0000000..024cd49 --- /dev/null +++ b/pyspark/helpers.py @@ -0,0 +1,19 @@ +from pyspark.sql.session import SparkSession +from pyspark.sql.functions import col, concat, lit, floor, rand +from pyspark.sql import functions as F + +def get_cities_by_region(spark,region): + region_cities_df = ( + spark.table("city") + .select( + "city_id", + "city_name", + "city_code", + "country_id", + "region", + "created_at", + "enable_tip" + ) + .filter('region="{}"'.format(region)) + ) + return region_cities_df \ No newline at end of file diff --git a/pyspark/job.py b/pyspark/job.py new file mode 100644 index 0000000..0b93123 --- /dev/null +++ b/pyspark/job.py @@ -0,0 +1,22 @@ +import argparse +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql.types import * + +from script import run_job + +def parse_arguments(argv=None): + ap = argparse.ArgumentParser() + ap.add_argument("--final_output_table") + return ap.parse_args(argv) + + +if __name__ == "__main__": + # Parse args + args_main = parse_arguments() + final_output_table = args_main.final_output_table + print(f"final_output_table: {final_output_table}") + + spark = SparkSession.builder.getOrCreate() + run_job(spark, final_output_table) + diff --git a/pyspark/requirements.txt b/pyspark/requirements.txt new file mode 100644 index 0000000..77bc2b3 --- /dev/null +++ b/pyspark/requirements.txt @@ -0,0 +1,5 @@ +numpy==1.26.4 +pandas==2.2.1 +pyspark==3.5.1 +sqlglot==22.0.0 +gable==0.10.0a0 \ No newline at end of file diff --git a/pyspark/script.py b/pyspark/script.py new file mode 100644 index 0000000..8f27983 --- /dev/null +++ b/pyspark/script.py @@ -0,0 +1,57 @@ +from pyspark.sql import functions as F + +from helpers import get_cities_by_region + +def run_job(spark, final_output_table): + + # Filter to cities in the Pacific Northwest + pnw_cities_df = get_cities_by_region(spark, "PNW") + + # Read bookings from the last 30 days + bookings_df = spark.sql("SELECT * FROM booking WHERE confirm_time >= date_sub(current_date(), 30)") + + # Join the bookings with the cities + pnw_cities_df.createOrReplaceTempView("pnw_cities") + bookings_df.createOrReplaceTempView("bookings_30_days") + joined_df = spark.sql(""" + SELECT + timestamp(date_trunc(b.confirm_time, 'DD')) AS "booking_date", + b.earning_adjustment_commission_rate AS "commission_rate", + b.receipt_payment_type AS "payment_type", + b.reward_id AS "reward_id", + c.*, + FROM + bookings_30_days b + JOIN pnw_cities c + ON b.city_id = c.city_id + """) + + # Write the joined data to the final output table + joined_df.write.mode("overwrite").saveAsTable(final_output_table) + + joined_df.createOrReplaceTempView("joined_temp") + commission_rate_metrics_df = spark.sql(""" + SELECT + booking_date, + avg(commission_rate) AS "avg_commission_rate", + max(commission_rate) AS "max_commission_rate", + min(commission_rate) AS "min_commission_rate" + FROM + joined_temp + """) + + # Merge into existing metrics table + commission_rate_metrics_df.createOrReplaceTempView("commission_rate_metrics_temp") + spark.sql(f""" + MERGE INTO commission_rate_metrics m + USING commission_rate_metrics_temp t + ON m.booking_date = t.booking_date + WHEN MATCHED THEN + UPDATE SET + m.avg_commission_rate = t.avg_commission_rate, + m.max_commission_rate = t.max_commission_rate, + m.min_commission_rate = t.min_commission_rate + WHEN NOT MATCHED THEN + INSERT (booking_date, avg_commission_rate, max_commission_rate, min_commission_rate) + VALUES (t.booking_date, t.avg_commission_rate, t.max_commission_rate, t.min_commission_rate) + """)