Skip to content

Commit

Permalink
Make same changes to s3.
Browse files Browse the repository at this point in the history
  • Loading branch information
Schalk1e committed Jun 12, 2024
1 parent 90c2c98 commit e0628e7
Show file tree
Hide file tree
Showing 7 changed files with 528 additions and 42 deletions.
22 changes: 22 additions & 0 deletions examples/s3_utils/s3_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

from s3 import pyS3

BUCKET = os.environ["BUCKET"]
PREFIX = os.environ["PREFIX"]

is_empty = pyS3.s3.is_empty(bucket=BUCKET, prefix=PREFIX)

print(f"Is prefix {PREFIX} empty: {is_empty}")

modified = pyS3.s3.get_last_modified(bucket=BUCKET, prefix=PREFIX)

print(f"Prefix {PREFIX} last modified: {modified}")

files = pyS3.s3.get_filenames(bucket=BUCKET, prefix=PREFIX)

print(f"Files: {files}")

state = pyS3.s3.read("s3://rdw-za/mnch/mnch-turn/contacts/state.parquet")

print(f"State: {state}")
447 changes: 446 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ httpx = "^0.27.0"
[tool.poetry.group.types.dependencies]
pandas-stubs = "^2.2.1.240316"
types-requests = "^2.31.0.20240406"
boto3-stubs = "^1.34.124"

[tool.poetry.group.dev.dependencies]
ruff = "^0.4.7"
Expand All @@ -37,9 +38,6 @@ line-length = 79

[tool.mypy]
# Run mypy . --explicit-package-bases. We use py.typed.
exclude = [ # Regexes.
'^src/s3/',
]
files = "."

follow_imports = "skip"
Expand Down
45 changes: 16 additions & 29 deletions src/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,28 @@
from boto3 import Session


class S3KeyMissingError(Exception):
pass
class MissingConfig(Exception):
"""Raised if a required config environment variable is not set."""


try:
S3_KEY = os.environ["S3_KEY"]
except KeyError as err:
raise S3KeyMissingError(
"Unable to locate S3_KEY in the global environment."
) from err
def config_from_env(key: str) -> str:
"""Fetches a config value from the global environment, raising
MissingConfig if it isn't there.
try:
S3_SECRET = os.environ["S3_SECRET"]
except KeyError as err:
raise S3KeyMissingError(
"Unable to locate S3_SECRET in the global environment."
) from err
"""
if not (value := os.environ.get(key, None)):
raise MissingConfig(f"{key} not set in the global environment")
return value


if not all([S3_KEY, S3_SECRET]):
raise S3KeyMissingError(
"""Unable to locate one or both of S3_KEY and S3_SECRET
in the global enviroment."""
)
S3_KEY = config_from_env("S3_KEY")
S3_SECRET = config_from_env("S3_SECRET")


class Session(Session):
def __init__(self, key, secret):
super().__init__(
aws_access_key_id=key,
aws_secret_access_key=secret,
region_name="af-south-1",
)


session = Session(key=S3_KEY, secret=S3_SECRET)
session: Session = Session(
aws_access_key_id=S3_KEY,
aws_secret_access_key=S3_SECRET,
region_name="af-south-1",
)

from .main import pyS3 as pyS3
2 changes: 2 additions & 0 deletions src/s3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@


class pyS3:
"""A wrapper class for different s3 submodules."""

s3 = S3(session)
Empty file added src/s3/py.typed
Empty file.
50 changes: 41 additions & 9 deletions src/s3/s3/s3.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
from collections.abc import Iterator
from typing import Literal

import awswrangler as wr
from attrs import define
from boto3 import Session
from pandas import DataFrame


@define
class S3:
def __init__(self, session):
self._session = session
"""For simple S3 operations required by the ingestion pipeline."""

_session: Session

def is_empty(self, bucket, prefix):
def is_empty(self, bucket: str, prefix: str) -> bool:
"""Check whether a prefix exists in a bucket."""
response = self._session.client("s3").list_objects_v2(
Bucket=bucket, Prefix=prefix
)

return "Contents" not in response

def get_paginator(self, bucket, prefix):
def get_paginator(self, bucket: str, prefix: str) -> Iterator[dict]:
"""See aws reference for the paginator below.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/paginator/ListObjectsV2.html
Creates an iterator that will paginate through responses from
S3.Client.list_objects_v2()
"""
paginator = self._session.client("s3").get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

return pages

def get_last_modified(self, bucket, prefix):
def get_last_modified(self, bucket: str, prefix: str) -> str:
"""Get the latest modified time for files in a prefix."""
times = []

pages = self.get_paginator(bucket=bucket, prefix=prefix)
Expand All @@ -28,7 +47,8 @@ def get_last_modified(self, bucket, prefix):

return max(times)

def get_filenames(self, bucket, prefix):
def get_filenames(self, bucket: str, prefix: str) -> list[str]:
"""List filenames in a prefix."""
pages = self.get_paginator(bucket=bucket, prefix=prefix)

filenames = []
Expand All @@ -38,13 +58,25 @@ def get_filenames(self, bucket, prefix):
filenames.append(record["Key"])
return filenames

def read(self, path):
def read(self, path: str) -> DataFrame:
"""Read a parquet file from s3 into a python session."""
return wr.s3.read_parquet(path, boto3_session=self._session)

def read_csv(self, path):
def read_csv(self, path: str) -> DataFrame:
"""Read a csv file from s3 into a python session."""
return wr.s3.read_csv(path, boto3_session=self._session)

def write(self, df, path, dataset=True, dtype=None, mode=None):
def write(
self,
df: DataFrame,
path: str | None = None,
dataset: bool = True,
dtype: dict[str, str] | None = None,
mode: (
Literal["append", "overwrite", "overwrite_partitions"] | None
) = None,
) -> None:
"""Write a DataFrame to parquet in s3."""
wr.s3.to_parquet(
df=df,
path=path,
Expand Down

0 comments on commit e0628e7

Please sign in to comment.