Skip to content

Commit

Permalink
fix: fall-back to ETag verification when downloading resources for wh…
Browse files Browse the repository at this point in the history
…ich no SHA256 sum exists
  • Loading branch information
paulmueller committed Oct 7, 2024
1 parent 5f64ef0 commit 3bf77e2
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
0.15.1
- fix: fall-back to ETag verification when downloading resources for
which no SHA256 sum exists
- fix: set error message before publishing download job error state
- ref: reduce logging level for SSL verification
0.15.0
- setup: bump dclab from 0.58.7 to 0.60.0 (internal basins support)
Expand Down
63 changes: 55 additions & 8 deletions dcoraid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,63 @@


@functools.lru_cache(maxsize=2000)
def sha256sum(path):
"""Compute the SHA256 sum of a file on disk"""
block_size = 2**20
def etagsum(path):
"""Compute the ETag for a file
The ETag of a resource on DCOR is defined by the way data
are uploaded to S3. Upload to S3 is done in chunks, the
number of which are defined by the size of the file.
The ETag can be computed from the MD5 sum of the MD5 sums [sic]
of the individual upload parts, followed by a dash "-" and the
number of upload parts.
The code for generating the upload URLs can be found at
:func:`dcor_shared.s3.create_presigned_upload_urls`.
"""
gib = 1024**3
mib = 1024**2
path = pathlib.Path(path)
file_hash = hashlib.sha256()
file_size = path.stat().st_size

if file_size % gib == 0:
num_parts = file_size // gib
else:
num_parts = file_size // gib + 1

# Compute the MD5 sums of the individual upload parts.
md5_sums = []
with path.open("rb") as fd:
while True:
data = fd.read(block_size)
if not data:
break
for ii in range(num_parts):
cur_md5 = hashlib.md5()
for jj in range(1024): # 1GB chunk = 1024 * 1MB chunk
data = fd.read(mib)
if not data:
break
cur_md5.update(data)
md5_sums.append(cur_md5.hexdigest())

if len(md5_sums) == 1:
etag = md5_sums[0]
else:
# Combine the MD5 sums into the ETag
hasher = hashlib.md5()
for etag_part in md5_sums:
etag_binary = int(etag_part, 16).to_bytes(length=16,
byteorder="big")
hasher.update(etag_binary)
etag = f"{hasher.hexdigest()}-{len(md5_sums)}"

return etag


@functools.lru_cache(maxsize=2000)
def sha256sum(path):
"""Compute the SHA256 hash of a file"""
mib = 1024 ** 2
file_hash = hashlib.sha256()
with open(path, "rb") as fd:
while data := fd.read(mib):
file_hash.update(data)
return file_hash.hexdigest()

Expand Down
69 changes: 54 additions & 15 deletions dcoraid/download/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import requests

from ..api import errors as api_errors
from ..common import sha256sum, weak_lru_cache
from ..common import etagsum, sha256sum, weak_lru_cache


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -269,8 +269,8 @@ def get_status(self):
except api_errors.APINotFoundError:
# The user likely tried to download the file from a different
# host or an evil admin deleted a file.
self.set_state("error")
self.traceback = traceback.format_exc()
self.set_state("error")
data = {
"state": self.state,
"bytes total": np.nan,
Expand All @@ -294,6 +294,10 @@ def set_state(self, state):
"""
if state not in JOB_STATES:
raise ValueError("Unknown state: '{}'".format(state))
if state == "error":
logger.error(f"Entered error state")
if self.traceback:
logger.error(f"{self.traceback}")
self.state = state

def task_download_resource(self):
Expand Down Expand Up @@ -357,6 +361,7 @@ def task_download_resource(self):
hasher.update(chunk)
bytes_present = self.path_temp.stat().st_size
headers["Range"] = f"bytes={bytes_present}-"

with requests.get(url,
stream=True,
headers=headers,
Expand Down Expand Up @@ -389,7 +394,7 @@ def task_download_resource(self):
+ "'{}'!".format(self.state))

def task_verify_resource(self):
"""Perform SHA256 verification"""
"""Perform ETag/SHA256 verification"""
if self.state == "downloaded":
if self.path.exists() and self.path.is_file():
# This means the download succeeded to `self.path_temp`
Expand All @@ -400,7 +405,7 @@ def task_verify_resource(self):
else: # only verify if we have self.temp_path
self.set_state("verify")
if self.condensed:
# do not perform SHA256 check
# do not perform ETag/SHA256 check
# TODO:
# - Check whether the condensed file can be opened
# with dclab?
Expand All @@ -411,18 +416,52 @@ def task_verify_resource(self):
self.path_temp.rename(self.path)
self.set_state("done")
else:
if self.sha256sum_dl is None:
logger.info(f"Computing SHA256 for {self.path_temp}")
self.sha256sum_dl = sha256sum(self.path_temp)
sha256_expected = self.get_resource_dict()["sha256"]
sha256_actual = self.sha256sum_dl
if sha256_expected != sha256_actual:
self.set_state("error")
self.traceback = (f"SHA256 sum check failed for "
f"{self.path}!")
# perform ETag/SHA256 check
res_dict = self.get_resource_dict()
rid = res_dict["id"]
# Can we verify the SHA256 sum?
sha256_expected = res_dict.get("sha256")
if True or sha256_expected is None:
# The server has not yet computed the SHA256 sum
# of the resource. This can happen when we are
# downloading a resource immediately after it was
# uploaded. Instead of verifying he SHA256 sum,
# verify the ETag of the file.
# TODO: Compute the ETag during download.
logger.info(f"Resource {rid} has no SHA256 set, "
f"falling back to ETag verification.")
import IPython
IPython.embed()
etag_expected = res_dict.get("etag")
if etag_expected is None:
self.traceback = (f"Neither SHA256 nor ETag "
f"defined for resource {rid}")
self.set_state("error")
else:
etag_actual = etagsum(self.path_temp)
if etag_expected != etag_actual:
self.traceback = (
f"ETag verification failed for resource "
f"{rid} ({self.path_temp})")
self.set_state("error")
else:
logger.info(f"ETag verified ({rid})")
self.path_temp.rename(self.path)
self.set_state("done")
else:
self.path_temp.rename(self.path)
self.set_state("done")
if self.sha256sum_dl is None:
logger.info(f"Computing SHA256 for resource {rid} "
f"({self.path_temp})")
self.sha256sum_dl = sha256sum(self.path_temp)
sha256_actual = self.sha256sum_dl
if sha256_expected != sha256_actual:
self.traceback = (f"SHA256 sum check failed for "
f"{self.path}!")
self.set_state("error")
else:
logger.info(f"SHA256 verified ({rid})")
self.path_temp.rename(self.path)
self.set_state("done")
elif self.state != "done": # ignore state "done" [sic!]
# Only issue this warning if the download is not already done.
warnings.warn("Resource verification is only possible when state "
Expand Down

0 comments on commit 3bf77e2

Please sign in to comment.