Skip to content

Commit a2c5235

Browse files
tyapochkinskrawcz
authored andcommitted
Add an example of using Hamilton in AWS Glue
1 parent 838a07a commit a2c5235

File tree

6 files changed

+167
-0
lines changed

6 files changed

+167
-0
lines changed

examples/aws/glue/README.md

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
[AWS Glue](https://aws.amazon.com/glue/) is a serverless data integration service. This guide demonstrates deploying a "hello-world" [processing job](https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html) using Hamilton functions on AWS Glue.
2+
3+
## Prerequisites
4+
5+
- **AWS CLI Setup**: Make sure the AWS CLI is set up on your machine. If you haven't done this yet, no worries! You can follow the [Quick Start guide](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-quickstart.html) for easy setup instructions.
6+
7+
## Step-by-Step Guide
8+
9+
### 1. Build wheel with Hamilton functions
10+
11+
First things first, AWS Glue jobs run a single python script, but you can include external code (like our Hamilton functions) by adding it as a python wheel. So, let's package our code and get it ready for action.
12+
13+
- **Install build package:**
14+
15+
This command installs the 'build' package, which we'll use to create our python wheel.
16+
17+
```shell
18+
pip install build
19+
```
20+
21+
- **Build python wheel:**
22+
23+
```shell
24+
cd app && python -m build --wheel --skip-dependency-check && cd ..
25+
```
26+
27+
### 2. Upload all necessary files to S3
28+
29+
- **Upload the wheel file to S3:**
30+
31+
Replace `<YOUR_PATH_TO_WHL>` with your specific S3 bucket and path:
32+
33+
```shell
34+
aws s3 cp app/dist/hamilton_functions-0.1-py3-none-any.whl s3://<YOUR_PATH_TO_WHL>/hamilton_functions-0.1-py3-none-any.whl
35+
```
36+
37+
- **Upload main python script to s3:**
38+
39+
Replace `<YOUR_PATH_TO_SCRIPT>` with your specific S3 bucket and path:
40+
41+
```shell
42+
aws s3 cp processing.py s3://<YOUR_PATH_TO_SCRIPT>/processing.py
43+
```
44+
45+
- **Upload input data to s3:**
46+
47+
Replace `<YOUR_PATH_TO_INPUT_DATA>` with your specific S3 bucket and path:
48+
49+
```shell
50+
aws s3 cp data/input_table.csv s3://<YOUR_PATH_TO_INPUT_DATA>
51+
```
52+
53+
### 3. Create a simple role for AWS Glue job execution
54+
55+
- **Create the Role**:
56+
57+
```shell
58+
aws iam create-role --role-name GlueProcessorRole --assume-role-policy-document '{"Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole"}]}'
59+
```
60+
61+
- **Attach Policies to the Role**:
62+
63+
Here we grant full access to S3 as an example. For production environments it's important to restrict access appropriately.
64+
65+
```shell
66+
aws iam attach-role-policy --role-name GlueProcessorRole --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
67+
aws iam attach-role-policy --role-name GlueProcessorRole --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
68+
```
69+
70+
### 4. Create and run the job
71+
72+
- **Create a job:**
73+
74+
Ensure all paths are correctly replaced with the actual ones:
75+
76+
```shell
77+
aws glue create-job --name test_hamilton_script --role GlueProcessorRole --command '{"Name" : "pythonshell", "PythonVersion": "3.9", "ScriptLocation" : "s3://<YOUR_PATH_TO_SCRIPT>/processing.py"}' --max-capacity 0.0625 --default-arguments '{"--extra-py-files" : "s3://<YOUR_PATH_TO_WHL>/hamilton_functions-0.1-py3-none-any.whl", "--additional-python-modules" : "sf-hamilton"}'
78+
```
79+
80+
- **Run the job:**
81+
82+
Ensure all paths are correctly replaced with the actual ones:
83+
84+
```shell
85+
aws glue start-job-run --job-name test_hamilton_script --arguments '{"--input-table" : "s3://<YOUR_PATH_TO_INPUT_DATA>", "--output-table" : "s3://<YOUR_PATH_TO_OUTPUT_DATA>"}'
86+
```
87+
88+
Once you've run the job, you should see an output file at `s3://<YOUR_PATH_TO_OUTPUT_DATA>`.

examples/aws/glue/app/hamilton_functions/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import pandas as pd
2+
3+
from hamilton.function_modifiers import extract_columns
4+
5+
6+
@extract_columns("spend", "signups")
7+
def raw_table(input_table: pd.DataFrame) -> pd.DataFrame:
8+
return input_table
9+
10+
11+
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
12+
"""Rolling 3 week average spend."""
13+
return spend.rolling(3).mean()
14+
15+
16+
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
17+
"""The cost per signup in relation to spend."""
18+
return spend / signups
19+
20+
21+
def spend_mean(spend: pd.Series) -> float:
22+
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
23+
return spend.mean()
24+
25+
26+
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
27+
"""Shows function that takes a scalar. In this case to zero mean spend."""
28+
return spend - spend_mean
29+
30+
31+
def spend_std_dev(spend: pd.Series) -> float:
32+
"""Function that computes the standard deviation of the spend column."""
33+
return spend.std()
34+
35+
36+
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
37+
"""Function showing one way to make spend have zero mean and unit variance."""
38+
return spend_zero_mean / spend_std_dev

examples/aws/glue/app/setup.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from setuptools import setup
2+
3+
setup(name="hamilton_functions", version="0.1", packages=["hamilton_functions"])
+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
signups,spend
2+
1,10
3+
10,10
4+
50,20
5+
100,40
6+
200,40
7+
400,50

examples/aws/glue/processing.py

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import sys
2+
3+
import pandas as pd
4+
5+
# awsglue is installed in the AWS Glue worker environment
6+
from awsglue.utils import getResolvedOptions
7+
from hamilton_functions import functions
8+
9+
from hamilton import driver
10+
11+
if __name__ == "__main__":
12+
13+
args = getResolvedOptions(sys.argv, ["input-table", "output-table"])
14+
15+
df = pd.read_csv(args["input_table"])
16+
17+
dr = driver.Driver({}, functions)
18+
19+
inputs = {"input_table": df}
20+
21+
output_columns = [
22+
"spend",
23+
"signups",
24+
"avg_3wk_spend",
25+
"spend_per_signup",
26+
"spend_zero_mean_unit_variance",
27+
]
28+
29+
# DAG execution
30+
df_result = dr.execute(output_columns, inputs=inputs)
31+
df_result.to_csv(args["output_table"])

0 commit comments

Comments
 (0)