Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample PySpark job #47

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
gable_tutorial
__pycache__
__pycache__
.venv
36 changes: 36 additions & 0 deletions pyspark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# PySpark

## Setup

From the `pyspark` folder

1. Start the hive container, which is automatically seeded with example table schemas

```bash
docker-compose -f ./docker/docker-compose.yml up -d
```

2. Create and activate a virtual environment, install dependencies

```bash
python3 -m venv ".venv"
source ".venv/bin/activate"
pip3 install --pre -r requirements.txt
```

## Register PySpark Job Output Tables

To register the PySpark job's output tables & their schemas, run the following command

```bash
gable data-asset register --source-type pyspark \
--project-root . \
--spark-job-entrypoint "job.py --final_output_table pnw_bookings_30_days" \
--connection-string hive://localhost:10000
```

`--project-root`: The path to the root of the Python project containing the PySpark job to run

`--spark-job-entrypoint`: The name of the entrypoint script for the PySpark job, as well as any arguments needed to run the job. If your Spar job uses config value from `SparkConf`, you can set the config values using the [normal Spark syntax](https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties) of `--conf spark.my_config_key=config_value`.

`--connection-string`: The [SQLAlchemy connection string](https://pypi.org/project/PyHive/) to connect to your Hive instance. Knowing the schemas of the SparkJob's input tables is required to compute the job's final output schemas.
12 changes: 12 additions & 0 deletions pyspark/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3'
services:
hive:
image: apache/hive:4.0.0-beta-1
container_name: hive
ports:
- "10000:10000"
environment:
- SERVICE_NAME=hiveserver2
volumes:
- ./:/opt/hive-scripts/
entrypoint: /opt/hive-scripts/init-hive.sh
36 changes: 36 additions & 0 deletions pyspark/docker/init-hive.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash

echo "Starting Hive"
pkill -f /opt/hive/bin/hive || true
/entrypoint.sh &
echo "Hive initialization was successful."

echo "Waiting for Hive server to start..."
SUCCESS=0
ATTEMPTS=0
MAX_ATTEMPTS=30

while [ $SUCCESS -eq 0 -a $ATTEMPTS -lt $MAX_ATTEMPTS ]; do
echo "Attempting to connect to Hive server (Attempt $((ATTEMPTS+1))/$MAX_ATTEMPTS)..."
if echo "SHOW DATABASES;" | beeline -u jdbc:hive2://localhost:10000 > /dev/null 2>&1; then
SUCCESS=1
echo "Hive server is up and running."
else
ATTEMPTS=$((ATTEMPTS+1))
sleep 2
fi
done

if [ $SUCCESS -eq 0 ]; then
echo "Failed to connect to Hive server after $MAX_ATTEMPTS attempts."
exit 1
fi

echo "Seeding Hive database..."

beeline -u jdbc:hive2://localhost:10000 -n root -p '' -f /opt/hive-scripts/seed_hive.txt

echo "Database seeded."

# Keep the container running until stopped
tail -f /dev/null
79 changes: 79 additions & 0 deletions pyspark/docker/seed_hive.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
CREATE TABLE IF NOT EXISTS area (
subcity_name STRING,
area_name STRING,
area_id STRING,
geohash STRING,
city_id STRING
);

CREATE TABLE IF NOT EXISTS city (
latitude DOUBLE,
windows_time_zone STRING,
territory STRING,
is_capital BOOLEAN,
city_code STRING,
region STRING,
longitude DOUBLE,
created_at TIMESTAMP,
city_name STRING,
city_id STRING,
updated_at TIMESTAMP,
country_id STRING,
parent_city_id STRING,
enable_tip BOOLEAN,
time_zone STRING
);


CREATE TABLE IF NOT EXISTS enterprise_metrics (
order_id STRING,
date_id STRING,
booking_code STRING,
booking_channel STRING,
last_group_id STRING,
array_enterprise_reward_id STRING,
product STRING
);

CREATE TABLE IF NOT EXISTS booking (
city_id STRING,
earning_adjustment_commission_rate DOUBLE,
booking_broadcast_time TIMESTAMP,
receipt_payment_type STRING,
confirm_time TIMESTAMP,
date_id STRING,
reward_id BIGINT
);

CREATE TABLE IF NOT EXISTS generic_metrics (
driver_longitude DOUBLE,
payment_type_id STRING,
confirm_time TIMESTAMP,
ignored_candidates STRING,
enterprise_campaign_code STRING,
cancel_types_used STRING,
is_soft_allocated BOOLEAN,
ar_denominator STRING,
actual_distance_of_trip DOUBLE,
sender_type STRING,
cancellation_fees DOUBLE,
city_id STRING,
created_at_utc TIMESTAMP
);

CREATE TABLE IF NOT EXISTS orders (
merchant_order_id STRING,
total_cancels_for_order BIGINT,
pickup_time_to TIMESTAMP,
dropoff_longitude DOUBLE,
is_gpc BOOLEAN,
user_type STRING,
regular_delivery_provider STRING,
city_id STRING,
updated_at TIMESTAMP,
first_allocation_timestamp TIMESTAMP,
cancelled_time TIMESTAMP,
cancel_types_used STRING,
pending_dropoff_time TIMESTAMP,
cod_type STRING
);
19 changes: 19 additions & 0 deletions pyspark/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, concat, lit, floor, rand
from pyspark.sql import functions as F

def get_cities_by_region(spark,region):
region_cities_df = (
spark.table("city")
.select(
"city_id",
"city_name",
"city_code",
"country_id",
"region",
"created_at",
"enable_tip"
)
.filter('region="{}"'.format(region))
)
return region_cities_df
22 changes: 22 additions & 0 deletions pyspark/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

from script import run_job

def parse_arguments(argv=None):
ap = argparse.ArgumentParser()
ap.add_argument("--final_output_table")
return ap.parse_args(argv)


if __name__ == "__main__":
# Parse args
args_main = parse_arguments()
final_output_table = args_main.final_output_table
print(f"final_output_table: {final_output_table}")

spark = SparkSession.builder.getOrCreate()
run_job(spark, final_output_table)

5 changes: 5 additions & 0 deletions pyspark/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
numpy==1.26.4
pandas==2.2.1
pyspark==3.5.1
sqlglot==22.0.0
gable==0.10.0a0
57 changes: 57 additions & 0 deletions pyspark/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from pyspark.sql import functions as F

from helpers import get_cities_by_region

def run_job(spark, final_output_table):

# Filter to cities in the Pacific Northwest
pnw_cities_df = get_cities_by_region(spark, "PNW")

# Read bookings from the last 30 days
bookings_df = spark.sql("SELECT * FROM booking WHERE confirm_time >= date_sub(current_date(), 30)")

# Join the bookings with the cities
pnw_cities_df.createOrReplaceTempView("pnw_cities")
bookings_df.createOrReplaceTempView("bookings_30_days")
joined_df = spark.sql("""
SELECT
timestamp(date_trunc(b.confirm_time, 'DD')) AS "booking_date",
b.earning_adjustment_commission_rate AS "commission_rate",
b.receipt_payment_type AS "payment_type",
b.reward_id AS "reward_id",
c.*,
FROM
bookings_30_days b
JOIN pnw_cities c
ON b.city_id = c.city_id
""")

# Write the joined data to the final output table
joined_df.write.mode("overwrite").saveAsTable(final_output_table)

joined_df.createOrReplaceTempView("joined_temp")
commission_rate_metrics_df = spark.sql("""
SELECT
booking_date,
avg(commission_rate) AS "avg_commission_rate",
max(commission_rate) AS "max_commission_rate",
min(commission_rate) AS "min_commission_rate"
FROM
joined_temp
""")

# Merge into existing metrics table
commission_rate_metrics_df.createOrReplaceTempView("commission_rate_metrics_temp")
spark.sql(f"""
MERGE INTO commission_rate_metrics m
USING commission_rate_metrics_temp t
ON m.booking_date = t.booking_date
WHEN MATCHED THEN
UPDATE SET
m.avg_commission_rate = t.avg_commission_rate,
m.max_commission_rate = t.max_commission_rate,
m.min_commission_rate = t.min_commission_rate
WHEN NOT MATCHED THEN
INSERT (booking_date, avg_commission_rate, max_commission_rate, min_commission_rate)
VALUES (t.booking_date, t.avg_commission_rate, t.max_commission_rate, t.min_commission_rate)
""")
Loading