Skip to content

Commit

Permalink
Merge pull request #50 from gabledata/adriank/pyspark_tutorial_updates
Browse files Browse the repository at this point in the history
PySpark tutorial updates
  • Loading branch information
adrianisk authored Mar 14, 2024
2 parents f6086cf + 6269311 commit 5198110
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 2 deletions.
78 changes: 77 additions & 1 deletion pyspark/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,49 @@
# PySpark

## Prerequisites

<details>
<summary>Docker</summary>
<br>

[Link to installation page](https://docs.docker.com/engine/install/)

Once Docker is installed, try running `docker pull apache/hive:4.0.0-beta-1` to verify the Docker engine is running, and you're able to pull the image needed for the tutorial.
</details>

<details>
<summary>nvm/node</summary>
<br>


The tutorial requires NodeJS version 18 or above, which can be installed using `nvm` (Node Version Manager). To check if you already have `nvm` installed on your machine, run

```bash
nvm --version
```

If `nvm` is not installed, you can run the commands below taken from [this Medium Article](https://medium.com/devops-techable/how-to-install-nvm-node-version-manager-on-macos-with-homebrew-1bc10626181).

```bash
brew update
brew install nvm
mkdir -p ~/.nvm

echo "export NVM_DIR=~/.nvm\nsource \$(brew --prefix nvm)/nvm.sh" >> .zshrc
source ~/.zshrc
nvm --version
```

Once `nvm` is installed, you can install and use any version of node 18 or above

```bash
nvm install 18
nvm use 18
node --version
```

</details>

## Setup

From the `pyspark` folder
Expand All @@ -12,15 +56,30 @@ From the `pyspark` folder

2. Create and activate a virtual environment, install dependencies

The `gable` CLI requires the active Python environment to have the PySpark job's Python dependencies installed. For this tutorial, we're creating a new Python virtual environment, activating it, and installing the PySpark job's requirements which are defined in the `requirements.txt` file.
```bash
python3 -m venv ".venv"
source ".venv/bin/activate"
pip3 install --pre -r requirements.txt
```
3. Set your Gable API Key
Log into Gable, and navigate to the `Settings -> API Keys`. Copy the API endpoint & API key values, and run the following in your terminal window
```bash
export GABLE_API_ENDPOINT=<copied_api_endpoint>
export GABLE_API_KEY=<copied_api_key>
```
## Register PySpark Job Output Tables
To register the PySpark job's output tables & their schemas, run the following command
Once the setup is complete, you're ready to register the PySpark job's output tables & their schemas!
The `gable` CLI needs to know the schemas of any tables the PySpark job reads from in order to compute the final output schema(s). There are currently two methods for providing the input schemas: a connection to your Hive cluster, which allows the CLI to query the information schema, or a CSV file containing the relevant schemas.
### Hive
```bash
gable data-asset register --source-type pyspark \
Expand All @@ -34,3 +93,20 @@ gable data-asset register --source-type pyspark \
`--spark-job-entrypoint`: The name of the entrypoint script for the PySpark job, as well as any arguments needed to run the job. If your Spar job uses config value from `SparkConf`, you can set the config values using the [normal Spark syntax](https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties) of `--conf spark.my_config_key=config_value`.
`--connection-string`: The [SQLAlchemy connection string](https://pypi.org/project/PyHive/) to connect to your Hive instance. Knowing the schemas of the SparkJob's input tables is required to compute the job's final output schemas.
### csv
```bash
gable data-asset register --source-type pyspark \
--project-root . \
--spark-job-entrypoint "job.py --final_output_table pnw_bookings_30_days" \
--csv-schema-file schemas.csv
```
`--project-root`: The path to the root of the Python project containing the PySpark job to run
`--spark-job-entrypoint`: The name of the entrypoint script for the PySpark job, as well as any arguments needed to run the job. If your Spar job uses config value from `SparkConf`, you can set the config values using the [normal Spark syntax](https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties) of `--conf spark.my_config_key=config_value`.
`csv-schema-file`: A CSV file containing the schema of all tables read from the PySpark job, with the header row
* `schema_table,col_name,col_type`
2 changes: 1 addition & 1 deletion pyspark/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ numpy==1.26.4
pandas==2.2.1
pyspark==3.5.1
sqlglot==22.0.0
gable==0.10.0a0
gable>=0.10.0a0
62 changes: 62 additions & 0 deletions pyspark/schemas.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
schema_table,col_name,col_type
area,subcity_name,STRING
area,area_name,STRING
area,area_id,STRING
area,geohash,STRING
area,city_id,STRING
city,latitude,DOUBLE
city,windows_time_zone,STRING
city,territory,STRING
city,is_capital,BOOLEAN
city,city_code,STRING
city,region,STRING
city,longitude,DOUBLE
city,created_at,TIMESTAMP
city,city_name,STRING
city,city_id,STRING
city,updated_at,TIMESTAMP
city,country_id,STRING
city,parent_city_id,STRING
city,enable_tip,BOOLEAN
city,time_zone,STRING
enterprise_metrics,order_id,STRING
enterprise_metrics,date_id,STRING
enterprise_metrics,booking_code,STRING
enterprise_metrics,booking_channel,STRING
enterprise_metrics,last_group_id,STRING
enterprise_metrics,array_enterprise_reward_id,STRING
enterprise_metrics,product,STRING
booking,city_id,STRING
booking,earning_adjustment_commission_rate,DOUBLE
booking,booking_broadcast_time,TIMESTAMP
booking,receipt_payment_type,STRING
booking,confirm_time,TIMESTAMP
booking,date_id,STRING
booking,reward_id,BIGINT
generic_metrics,driver_longitude,DOUBLE
generic_metrics,payment_type_id,STRING
generic_metrics,confirm_time,TIMESTAMP
generic_metrics,ignored_candidates,STRING
generic_metrics,enterprise_campaign_code,STRING
generic_metrics,cancel_types_used,STRING
generic_metrics,is_soft_allocated,BOOLEAN
generic_metrics,ar_denominator,STRING
generic_metrics,actual_distance_of_trip,DOUBLE
generic_metrics,sender_type,STRING
generic_metrics,cancellation_fees,DOUBLE
generic_metrics,city_id,STRING
generic_metrics,created_at_utc,TIMESTAMP
orders,merchant_order_id,STRING
orders,total_cancels_for_order,BIGINT
orders,pickup_time_to,TIMESTAMP
orders,dropoff_longitude,DOUBLE
orders,is_gpc,BOOLEAN
orders,user_type,STRING
orders,regular_delivery_provider,STRING
orders,city_id,STRING
orders,updated_at,TIMESTAMP
orders,first_allocation_timestamp,TIMESTAMP
orders,cancelled_time,TIMESTAMP
orders,cancel_types_used,STRING
orders,pending_dropoff_time,TIMESTAMP
orders,cod_type,STRING

0 comments on commit 5198110

Please sign in to comment.