Skip to content

Commit

Permalink
fix: dance around implementation differences and bugs by S3 object st…
Browse files Browse the repository at this point in the history
…ore services
  • Loading branch information
paulmueller committed Apr 15, 2024
1 parent a9dbc60 commit eb8c146
Showing 1 changed file with 104 additions and 22 deletions.
126 changes: 104 additions & 22 deletions dcoraid/api/s3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import hashlib
import io
import logging
import os
import pathlib
import re
import traceback
from typing import BinaryIO, List
import warnings

import requests

from .errors import S3UploadError


logger = logging.getLogger(__name__)

MiB = 1024 ** 2
GiB = 1024 ** 3

Expand Down Expand Up @@ -205,15 +208,26 @@ def get_etag_from_response(response):
that properly extracts the ETag.
"""
etag = None

# Get the ETag from the response header
for etag_key in [
"etag", # Swift/OpenStack
"ETag", # minio
"etag", # OpenStack / Ceph(Quincy), only for single PUT requests
"ETag", # minio (this is actually the correct name)
"Etag", # other?
"ETAG", # who knows...
]:
etag_str = response.headers.get(etag_key)
if etag_str:
etag = etag_str.strip("'").strip('"')
break

# Get the ETag from the request body
if etag is None:
xml_etag_regexp = re.compile("<ETag>([a-f0-9]*)</ETag>", re.IGNORECASE)
body = response.content.decode("utf-8")
xml_search = xml_etag_regexp.findall(body)
if len(xml_search) == 1: # If it is more than one, could be the parts
etag = xml_search[0]
return etag


Expand All @@ -229,6 +243,12 @@ def requests_put_data_and_get_etag(
):
"""Upload data via a PUT request and return the ETag
Parameters
----------
part_number: int
multipart upload part number, indexing starts at 0
Since this method makes use of :class:`.FilePart` which
supports on-the-fly MD5 sum computation, the ETag is
automatically verified with the MD5 sum of the file.
Expand All @@ -245,13 +265,14 @@ def requests_put_data_and_get_etag(
trcbck = None
resp = None
try:
logger.info(f"Uploading part {part_number + 1} with {put_url}")
resp = requests.put(put_url,
data=fd_part,
timeout=timeout,
)
except BaseException:
trcbck = traceback.format_exc()
warnings.warn(f"Encountered {trcbck} for {put_url}")
logger.warning(f"Encountered Exception for {put_url}:\n{trcbck} ")
continue
else:
# Obtain the ETag from the headers
Expand All @@ -277,12 +298,10 @@ def upload_s3_presigned(
):
"""Upload data to an S3 bucket using presigned URLS
For user convenience, this method performs some sanity check
such as minimum and maximum part size and other sanity checks
before the upload.
In addition, the uploaded data are verified by checking the ETag
returned by the object store.
For user convenience, this method performs some sanity checks
such as minimum and maximum part size. In addition, the uploaded
data are verified by checking the ETag returned by the object store.
See also https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html.
Parameters
----------
Expand All @@ -305,7 +324,7 @@ def upload_s3_presigned(
Returns
-------
etag: str
ETag of the uploaded file in object store
ETag of the uploaded file in the object store
"""
file_size = path.stat().st_size
num_parts = len(upload_urls)
Expand Down Expand Up @@ -423,7 +442,21 @@ def upload_s3_presigned_multipart(
Returns
-------
etag: str
ETag of the uploaded file in object store
ETag of the uploaded file in the object store. The ETag is
computed during the upload in this function and then
compared to the ETag returned via the S3 API. If the server
does not return an ETag, then the expected ETag is still
returned.
Notes
-----
If you are interested in creating the presigned URLs used by this
method, first take a look at the Amazon multipart upload docs:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
You have to create presigned URLs for the "put_object" method and for the
"complete_multipart_upload" method. You can find an implementation
in the DCOR code, method "create_presigned_upload_urls" in:
https://github.com/DCOR-dev/dcor_shared/blob/master/dcor_shared/s3.py
"""
retries = max(1, retries)
path = pathlib.Path(path)
Expand Down Expand Up @@ -470,20 +503,50 @@ def upload_s3_presigned_multipart(
)
except BaseException:
trcbck_compl = traceback.format_exc()
warnings.warn(
logger.warning(
f"Encountered {trcbck_compl} for {complete_multipart_url}")
continue
else:
etag_full = get_etag_from_response(resp_compl)
if etag_full is not None and etag_full == etag_expected:
break
if etag_full is not None:
if etag_full == etag_expected:
# This is the ideal case. Everything is good.
break
else:
logger.warning(f"ETag mismatch, expected {etag_expected}, "
f"got {etag_full}; (retry {ii})")
# The server returned the wrong ETag. We will try again.
continue
else:
# Some servers do not properly return the ETag in the response
# (header), so we cannot rely on this being the case.
# See e.g. https://github.com/ceph/ceph/pull/51447
# What we do instead is verify that there are no error messages
# in the response body:
rbody = resp_compl.content.decode("utf-8")
if rbody.lower().count("<error>"):
logger.warning(
f"Server did not return ETag in the response header "
f"and returned an error message in the body: {rbody}"
)
continue
else:
# We have to assume everything went well.
logger.info("Server did not return ETag in the response, "
"but there is no error message in the body; "
"I assume that the upload is complete.")
break
else:
raise S3UploadError(
f"Not able to complete multipart upload ({retries} retries), "
f"got {resp_compl.headers if resp_compl is not None else None} "
f"with {trcbck_compl}")
err_msg = f"Not able to complete multipart upload ({retries} retries)"
if resp_compl is not None:
err_msg += (f"\nGot response {resp_compl.content} with header "
f"{resp_compl.headers}.")
if trcbck_compl is not None:
err_msg += f"\nGot traceback:\n{trcbck_compl}"

return etag_full
raise S3UploadError(err_msg)

return etag_expected


def upload_s3_presigned_single(
Expand All @@ -494,7 +557,7 @@ def upload_s3_presigned_single(
timeout: float = 27.3,
callback: callable = None
):
"""Upload a single file using a PUT request
"""Upload a single file using a PUT request to a presigned URL
The returned ETag is checked against the MD5 sum of the file.
Expand All @@ -519,7 +582,26 @@ def upload_s3_presigned_single(
-------
etag: str
ETag of the uploaded file in object store
"""
Notes
-----
If you are interested in creating the presigned URL for a PUT request,
take a look at the boto3 documentation here:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_object.html#put-object
For an actual implementation in Python (which is used in DCOR), take a
look at the method `create_presigned_upload_urls` here:
https://github.com/DCOR-dev/dcor_shared/blob/master/dcor_shared/s3.py
For a single PUT request, it looks like this::
psurl = s3_client.generate_presigned_url(
"put_object",
Params={'Bucket': bucket_name,
'Key': object_name,
},
ExpiresIn=expiration,
HttpMethod='PUT',
)
""" # noqa
retries = max(1, retries)
with path.open("rb") as fd:
monitor = UploadMonitorLink(fd=fd,
Expand Down

0 comments on commit eb8c146

Please sign in to comment.