Skip to content

Commit

Permalink
ref: use multipart upload for direct upload to S3
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Apr 12, 2024
1 parent 3a85965 commit a9dbc60
Show file tree
Hide file tree
Showing 6 changed files with 585 additions and 53 deletions.
73 changes: 28 additions & 45 deletions dcoraid/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor

from .errors import APIBadRequest, APIConflictError, APINotFoundError
from .errors import (
APIBadRequest, APIConflictError, APINotFoundError, NoS3UploadAvailableError
)
from .ckan_api import CKANAPI


class NoS3UploadAvailableError(Exception):
"""Used for identifying DCOR servers that don't support direct S3 upload"""
pass
from .s3_api import upload_s3_presigned


def dataset_activate(dataset_id, api):
Expand Down Expand Up @@ -290,58 +288,43 @@ def resource_add_upload_direct_s3(
if logger is not None:
logger.info(f"Commencing S3 upload of {upload_id}")

# retrieve the organization ID
resource_path = pathlib.Path(resource_path)
# retrieve the organization ID and file size
org_id = get_organization_id_for_dataset(api=api, dataset_id=dataset_id)
file_size = resource_path.stat().st_size

# retrieve the upload URL and the data fields
try:
upload_info = api.get("resource_upload_s3_url",
organization_id=org_id)
upload_info = api.get(
"resource_upload_s3_urls",
organization_id=org_id,
file_size=file_size,
)
except APIBadRequest:
raise NoS3UploadAvailableError(f"Server {api.server} does not yet "
f"support direct upload to S3")

# preshared URL for the upload
ps_url = upload_info["url"]
# special fields for S3
fields = upload_info["fields"]

# Perform the upload to S3
with resource_path.open("rb") as fd:
fields["file"] = (fields["key"], fd)
e = MultipartEncoder(fields=fields)
m = MultipartEncoderMonitor(e, monitor_callback)
# Increase the read size to speed-up upload (the default chunk
# size for uploads in urllib is 8k which results in a lot of
# Python code being involved in uploading a 20GB file; Setting
# the chunk size to 4MB should increase the upload speed):
# https://github.com/requests/toolbelt/issues/75
# #issuecomment-237189952
m._read = m.read
m.read = lambda size: m._read(4 * 1024 * 1024)

# perform the actual upload
hrep = requests.post(
ps_url,
data=m,
headers={'Content-Type': m.content_type},
verify=True, # verify SSL connection
timeout=timeout, # timeout to avoid freezing
)
logger.info(
f"We have {len(upload_info['upload_urls'])} upload parts for "
f"a resource of size {file_size/1024**2:.2f} MiB of {upload_id}")

if hrep.status_code != 204:
raise ValueError(f"Upload of {upload_id} failed with "
f"{hrep.status_code}: {hrep.reason}")
upload_s3_presigned(
path=resource_path,
upload_urls=upload_info["upload_urls"],
complete_url=upload_info["complete_url"],
callback=monitor_callback,
timeout=timeout,
)

# The upload succeeded, now add the resource to the CKAN database.
revise_dict = {
"match": {"id": dataset_id},
"update": {"resources": [{"id": upload_info["resource_id"],
"name": resource_name,
"s3_available": True,
}
]
}
# Add a new resource(__extend on flattened key)
"update__resources__extend": [{"id": upload_info["resource_id"],
"name": resource_name,
"s3_available": True,
}
]
}
api.post("package_revise", revise_dict)

Expand Down
10 changes: 10 additions & 0 deletions dcoraid/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,13 @@ class APIOutdatedError(APIError):
class NoAPIKeyError(APIError):
"""DCOR does not have an API key"""
pass


class NoS3UploadAvailableError(BaseException):
"""Used for identifying DCOR servers that don't support direct S3 upload"""
pass


class S3UploadError(BaseException):
"""raised when an upload to S3 failed"""
pass
Loading

0 comments on commit a9dbc60

Please sign in to comment.