Skip to content

Commit

Permalink
reg: correctly implement ETag verification (bad commit 3bf77e2)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Oct 7, 2024
1 parent 5235856 commit c354b7b
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 52 deletions.
61 changes: 53 additions & 8 deletions dcoraid/api/s3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,51 @@ def assemble_complete_multipart_xml(parts_etags):
return s


def compute_upload_part_parameters(file_size):
"""Given a file of certain size, return sizes and number of parts
Parameters
----------
file_size: int
file size in bytes
Returns
-------
parms: dict
dictionary with the keys:
- "num_parts": number of parts of the upload
- "part_size": size of the parts (except for the last part)
- "part_size_last": size of the last part
- "file_size": same as input parameter
"""
gib = 1024**3

# Compute number of parts
if file_size % gib == 0:
num_parts = file_size // gib
else:
num_parts = file_size // gib + 1

# Compute part file size
if file_size % num_parts == 0:
# Every part has the same size, since the file size is
# a multiple of the number of parts.
part_size = file_size // num_parts
else:
# The last part is a few bytes smaller than the other parts.
part_size = file_size // num_parts + 1

part_size_last = file_size - part_size * (num_parts - 1)

return {
"num_parts": num_parts,
"part_size": part_size,
"part_size_last": part_size_last,
"file_size": file_size,
}


def get_etag_from_response(response):
"""Given a response from a PUT or POST request, extract the ETag
Expand Down Expand Up @@ -329,14 +374,14 @@ def upload_s3_presigned(
file_size = path.stat().st_size
num_parts = len(upload_urls)

if file_size % num_parts == 0:
part_size = file_size // num_parts
else:
part_size = file_size // num_parts + 1

final_part_size = file_size - part_size * (num_parts - 1)
parms = compute_upload_part_parameters(file_size)
if num_parts != parms["num_parts"]:
raise ValueError(f"Expected {parms['num_parts']} upload URLs, "
f"got {num_parts}")
part_size = parms["part_size"]
part_size_last = parms["part_size_last"]

if final_part_size <= 0:
if part_size_last <= 0:
# Something went wrong. If we attempt to upload the file
# with the given number of parts, then there is no data
# left (at least) for the final part.
Expand All @@ -357,7 +402,7 @@ def upload_s3_presigned(
f"uploading a single part to the S3 object storage. Please "
f"increase the number of parts for your upload.")

if final_part_size < (5 * MiB) and num_parts > 1:
if part_size_last < (5 * MiB) and num_parts > 1:
raise ValueError(
f"The size for one upload part, given the file size of "
f"{file_size / 1024:.1f} kiB and the number of parts "
Expand Down
30 changes: 14 additions & 16 deletions dcoraid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import requests

from .api import s3_api


ConnectionTimeoutErrors = (ConnectionError,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout)
Expand All @@ -25,27 +28,22 @@ def etagsum(path):
The code for generating the upload URLs can be found at
:func:`dcor_shared.s3.create_presigned_upload_urls`.
"""
gib = 1024**3
mib = 1024**2
path = pathlib.Path(path)
file_size = path.stat().st_size

if file_size % gib == 0:
num_parts = file_size // gib
else:
num_parts = file_size // gib + 1
parms = s3_api.compute_upload_part_parameters(path.stat().st_size)

# Compute the MD5 sums of the individual upload parts.
md5_sums = []
with path.open("rb") as fd:
for ii in range(num_parts):
cur_md5 = hashlib.md5()
for jj in range(1024): # 1GB chunk = 1024 * 1MB chunk
data = fd.read(mib)
if not data:
break
cur_md5.update(data)
md5_sums.append(cur_md5.hexdigest())
for ii in range(parms["num_parts"]):
fd_part = s3_api.FilePart(file_object=fd,
part_number=ii,
part_size=parms["part_size"],
file_size=parms["file_size"],
)
part_hash = hashlib.md5()
while data := fd_part.read(s3_api.MiB):
part_hash.update(data)
md5_sums.append(part_hash.hexdigest())

if len(md5_sums) == 1:
etag = md5_sums[0]
Expand Down
57 changes: 29 additions & 28 deletions dcoraid/download/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def set_state(self, state):
if state not in JOB_STATES:
raise ValueError("Unknown state: '{}'".format(state))
if state == "error":
logger.error(f"Entered error state")
logger.error("Entered error state")
if self.traceback:
logger.error(f"{self.traceback}")
self.state = state
Expand Down Expand Up @@ -349,6 +349,8 @@ def task_download_resource(self):
# Do the things to do and watch self.state while doing so
url = self.get_resource_url()
headers = copy.deepcopy(self.api.headers)

bytes_present = 0
if self.path_temp.exists():
# Resume a previous download.
# We have to update the hash of the current file with
Expand All @@ -362,30 +364,31 @@ def task_download_resource(self):
bytes_present = self.path_temp.stat().st_size
headers["Range"] = f"bytes={bytes_present}-"

with requests.get(url,
stream=True,
headers=headers,
verify=self.api.verify,
timeout=29.9) as r:
r.raise_for_status()
with self.path_temp.open('ab') as f:
chunk_size = 1024 * 1024
for chunk in r.iter_content(chunk_size=chunk_size):
# If you have chunk encoded response uncomment
# if and set chunk_size parameter to None.
f.write(chunk)
self.file_bytes_downloaded += len(chunk)
# Compute the SHA256 sum while downloading.
# This is faster than reading everything
# again after the download but has the slight
# risk of losing data in memory before it got
# written to disk. A risk we are going to take
# for the sake of performance.
if (self.sha256sum_dl is None
# We do not verify SHA256 for condensed
and not self.condensed):
hasher.update(chunk)
self.sha256sum_dl = hasher.hexdigest()
if bytes_present != self.file_size:
with requests.get(url,
stream=True,
headers=headers,
verify=self.api.verify,
timeout=29.9) as r:
r.raise_for_status()
with self.path_temp.open('ab') as f:
mib = 1024 * 1024
for chunk in r.iter_content(chunk_size=mib):
f.write(chunk)
self.file_bytes_downloaded += len(chunk)
# Compute the SHA256 sum while downloading.
# This is faster than reading everything
# again after the download but has the
# slight risk of losing data in memory
# before it got written to disk. A risk we
# are going to take for the sake of
# performance.
if (self.sha256sum_dl is None
# We do not verify SHA256
# for condensed data.
and not self.condensed):
hasher.update(chunk)
self.sha256sum_dl = hasher.hexdigest()
self.end_time = time.perf_counter()
self.set_state("downloaded")
else:
Expand Down Expand Up @@ -421,7 +424,7 @@ def task_verify_resource(self):
rid = self.resource_id
# Can we verify the SHA256 sum?
sha256_expected = res_dict.get("sha256")
if True or sha256_expected is None:
if sha256_expected is None:
# The server has not yet computed the SHA256 sum
# of the resource. This can happen when we are
# downloading a resource immediately after it was
Expand All @@ -430,8 +433,6 @@ def task_verify_resource(self):
# TODO: Compute the ETag during download.
logger.info(f"Resource {rid} has no SHA256 set, "
f"falling back to ETag verification.")
import IPython
IPython.embed()
etag_expected = res_dict.get("etag")
if etag_expected is None:
self.traceback = (f"Neither SHA256 nor ETag "
Expand Down
42 changes: 42 additions & 0 deletions tests/test_api_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from dcoraid.api import s3_api


import pytest

GiB = 1024**3
MiB = 1024**2


@pytest.mark.parametrize("file_size,expected", [
[100,
{"num_parts": 1,
"part_size": 100,
"part_size_last": 100,
"file_size": 100,
}
],
[2 * GiB,
{"num_parts": 2,
"part_size": GiB,
"part_size_last": GiB,
"file_size": 2*GiB,
}
],
[2 * GiB - 2,
{"num_parts": 2,
"part_size": GiB - 1,
"part_size_last": GiB - 1,
"file_size": 2 * GiB - 2,
}
],
[GiB + 2,
{"num_parts": 2,
"part_size": GiB // 2 + 1,
"part_size_last": GiB // 2 + 1,
"file_size": GiB + 2,
}
],
])
def test_compute_upload_part_parameters(file_size, expected):
actual = s3_api.compute_upload_part_parameters(file_size)
assert expected == actual

0 comments on commit c354b7b

Please sign in to comment.