Skip to content

Commit

Permalink
enh: also check S3 storage for orphaned objects in dcor inspect
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Mar 5, 2024
1 parent b588092 commit 24d95d0
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 14 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
0.9.6
0.10.0
- enh: also check S3 storage for orphaned objects in `dcor inspect`
- enh: always encrypt beaker session cookies
- enh: set up cookies more strictly
- enh: always perform CSS rebranding after `dcor develop`
Expand Down
21 changes: 14 additions & 7 deletions dcor_control/cli/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,25 @@ def status():
s3_client, s3_session, s3_resource = s3.get_s3()
buckets = [b["Name"] for b in s3_client.list_buckets()["Buckets"]]
for bucket in buckets:
ctoken = ""
while ctoken is not None:
resp = s3_client.list_objects_v2(Bucket=bucket,
MaxKeys=1000,
ContinuationToken=ctoken)
ctoken = resp.get("NextContinuationToken")
for obj in resp.get("Contents"):
kwargs = {"Bucket": bucket,
"MaxKeys": 500
}
while True:
resp = s3_client.list_objects_v2(**kwargs)

for obj in resp.get("Contents", []):
if obj["Key"].startswith("resource/"):
num_resources += 1
size_resources += obj["Size"]
else:
size_other += obj["Size"]

if not resp.get("IsTruncated"):
break
else:
kwargs["ContinuationToken"] = resp.get(
"NextContinuationToken")

click.echo(f"S3 buckets: {len(buckets)}")
click.echo(f"S3 resources number: {num_resources}")
click.echo(f"S3 resources size: {size_resources/1024**3:.0f} GB")
Expand Down
1 change: 1 addition & 0 deletions dcor_control/cli/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ def inspect(assume_yes=False):
# ask the user whether to search for orphaned files
if assume_yes or click.confirm('Perform search for orphaned files?'):
inspect_mod.check_orphaned_files(assume_yes=assume_yes)
inspect_mod.check_orphaned_s3_artifacts(assume_yes=assume_yes)

click.secho('DONE', fg=u'green', bold=True)
3 changes: 2 additions & 1 deletion dcor_control/inspect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
reload_nginx,
)
from .config_uwsgi import check_uwsgi
from .data_ckan import check_orphaned_files
from .data_ckan_local import check_orphaned_files
from .data_ckan_s3 import check_orphaned_s3_artifacts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ def check_orphaned_files(assume_yes=False):
resource_ids = get_resource_ids()
orphans_processed = [] # list for keeping track of orphans

click.secho("Scanning resource tree for orphaned files...", bold=True)
# Scan CKAN resources
# Scan resources directory on block storage
click.secho("Scanning local resource tree for orphaned files...",
bold=True)
for pp in resources_path.rglob("*/*/*"):
if (pp.is_dir() # directories
or (pp.exists()
Expand All @@ -141,15 +142,16 @@ def check_orphaned_files(assume_yes=False):
request_removal([pp], autocorrect=assume_yes)

# Scan user depot for orphans
click.secho("Scanning user depot tree for orphaned files...", bold=True)
click.secho("Scanning local user depot tree for orphaned files...",
bold=True)
for pp in userdepot_path.rglob("*/*/*/*"):
res_id = pp.name.split("_")[1]
if res_id not in resource_ids and res_id not in orphans_processed:
if assume_yes:
print("Deleting {}".format(pp))
print("Deleting local file {}".format(pp))
delok = True
else:
delok = ask("Delete orphaned file '{}'?".format(pp))
delok = ask("Delete orphaned local file '{}'?".format(pp))
if delok:
pp.unlink()
remove_empty_folders(pp.parent.parent.parent)
Expand Down
171 changes: 171 additions & 0 deletions dcor_control/inspect/data_ckan_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from datetime import datetime, timedelta
from functools import lru_cache
import re
import subprocess as sp

import click

from dcor_shared import get_ckan_config_option, paths, s3

from .data_ckan_local import ask


ARTIFACT_NAMES = ["condensed", "preview", "resource"]


def check_orphaned_s3_artifacts(assume_yes=False):
"""Check all DCOR buckets for orphaned artifacts"""
s3_client, _, s3_resource = s3.get_s3()

# Find buckets that do not belong to an actual circle and delete them
# list of actual circles
circles_ckan = get_circles_ckan()

# list of circles for which we have buckets that are older than a week
circles_s3 = get_circles_s3(older_than_days=0) # TODO

# bucket_definition
bucket_scheme = get_ckan_config_option("dcor_object_store.bucket_name")

click.secho("Scanning S3 object store for orphaned objects...",
bold=True)

# find "older_than_days" S3 circles that are not defined in CKAN
for cs3 in circles_s3:
if cs3 not in circles_ckan:
click.secho(f"Found S3 bucket for non-existent circle {cs3}")
request_bucket_removal(bucket_name=cs3, autocorrect=assume_yes)
continue
# Iterate through the resources of that circle
circle_resources = list_group_resources_ckan(cs3)
bucket_name = bucket_scheme.format(organization_id=cs3)
invalid_artifacts = []
for object_name in iter_bucket_objects_s3(bucket_name):
artifact = object_name.split("/")[0]
if artifact in ARTIFACT_NAMES:
rid = "".join(object_name.split("/")[1:])
if rid not in circle_resources:
invalid_artifacts.append(object_name)

if invalid_artifacts:
# Ask the user whether we should remove these resources for
# this circle
request_removal_from_bucket(
bucket_name=bucket_name,
objects=invalid_artifacts,
autocorrect=assume_yes
)


@lru_cache(maxsize=32)
def get_circles_ckan():
"""Return list of circle IDs defined in CKAN"""
ckan_ini = paths.get_ckan_config_path()
data = sp.check_output(
f"ckan -c {ckan_ini} list-circles",
shell=True).decode().split("\n")
return [f.split()[0] for f in data if f.strip()]


@lru_cache(maxsize=32)
def get_circles_s3(older_than_days=0):
"""Return list of circle IDs defined in S3"""
s3_client, _, _ = s3.get_s3()
buckets = s3_client.list_buckets().get("Buckets", [])
# compile regexp for identifying cirlces
bucket_scheme = get_ckan_config_option("dcor_object_store.bucket_name")
bucket_regexp = re.compile(bucket_scheme.replace(
r"{organization_id}",
r"([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})"))

circle_list = []
for bdict in buckets:
creation_date = bdict["CreationDate"]
tz = creation_date.tzinfo
if creation_date > (datetime.now(tz=tz)
- timedelta(days=older_than_days)):
# Ignore circles that are younger than a week
continue
# Find circles that match our regular expression scheme
r_match = bucket_regexp.match(bdict["Name"])
if r_match is not None:
circle_id = r_match.group(1)
circle_list.append(circle_id)
return circle_list


def iter_bucket_objects_s3(bucket_name):
"""Return iterator over all objects in a Bucket"""
s3_client, _, s3_resource = s3.get_s3()
kwargs = {"Bucket": bucket_name,
"MaxKeys": 100
}
while True:
resp = s3_client.list_objects_v2(**kwargs)

for obj in resp.get("Contents", []):
object_name = obj["Key"]
yield object_name

if not resp.get("IsTruncated"):
break
else:
kwargs["ContinuationToken"] = resp.get("NextContinuationToken")


def list_group_resources_ckan(group_name_or_id):
"""Return list of resources for a circle or collection"""
ckan_ini = paths.get_ckan_config_path()
data = sp.check_output(
f"ckan -c {ckan_ini} list-group-resources {group_name_or_id}",
shell=True).decode().split("\n")
return [f.strip() for f in data if f.strip()]


def request_bucket_removal(bucket_name, autocorrect=False):
"""Request (user interaction) the removal of an entire bucket"""
if autocorrect:
print(f"Deleting {bucket_name}")
del_ok = True
else:
del_ok = ask(f"Completely remove orphan bucket {bucket_name}?")

if del_ok:
s3_client, _, _ = s3.get_s3()
# Delete the objects
request_removal_from_bucket(
bucket_name=bucket_name,
objects=iter_bucket_objects_s3(bucket_name)
)
# Delete the bucket if it is not empty
if len(list(iter_bucket_objects_s3(bucket_name))) == 0:
s3_client.delete_bucket(Bucket=bucket_name)


def request_removal_from_bucket(bucket_name, objects, autocorrect=False):
"""Request (user interaction) and perform removal of a list of objects
Parameters
----------
bucket_name: str
The bucket from which to remote the objects
objects: list of str or iterable of str
The objects to be removed
autocorrect: bool
Whether to remove the objects without asking the user
"""
if autocorrect:
for obj in objects:
print(f"Deleting {bucket_name}/{obj}")
del_ok = True
else:
del_ok = ask(
"These objects are not related to any existing resource: "
+ "".join([f"\n - {bucket_name}/{obj}" for obj in objects])
+ "\nDelete these orphaned objects?")

if del_ok:
s3_client, _, _ = s3.get_s3()
for obj in objects:
s3_client.delete_object(Bucket=bucket_name,
Key=obj)

0 comments on commit 24d95d0

Please sign in to comment.