Skip to content

Commit

Permalink
Add delete by prefix method
Browse files Browse the repository at this point in the history
  • Loading branch information
gitcarbs committed Jan 9, 2025
1 parent 0078eaa commit a644e76
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
6.1.1
-------------------
- Add delete by prefix method

6.1.0 (unreleased)
-------------------
- Updating cloud vacuum support to standardize implementation
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.1.0
6.1.1
59 changes: 42 additions & 17 deletions guillotina_gcloudstorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class IGCloudFileStorageManager(IExternalFileStorageManager):
SERVICE_ACCOUNT = "default"

SCOPES = ["https://www.googleapis.com/auth/devstorage.read_write"]
UPLOAD_URL = "https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?uploadType=resumable" # noqa
UPLOAD_URL = (
"https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?uploadType=resumable"
) # noqa
OBJECT_BASE_URL = "https://www.googleapis.com/storage/v1/b"
CHUNK_SIZE = 524288
MAX_RETRIES = 5
Expand Down Expand Up @@ -242,9 +244,32 @@ async def start(self, dm):
current_upload=0, resumable_uri=resumable_uri, upload_file_id=upload_file_id
)

@backoff.on_exception(backoff.expo, RETRIABLE_EXCEPTIONS, max_tries=10)
async def delete_by_prefix(self, uri):
util = get_utility(IGCloudBlobStore)

if uri and "::" in uri:
prefix = uri.split("::")[0]
blobs, _ = await util.get_blobs(prefix=prefix)
candidate_keys = [blob.name for blob in blobs if "::" in blob.name]

if not candidate_keys:
return False

success_keys, failure_keys = await util.delete_blobs(
keys=candidate_keys, bucket_name=await util.get_bucket_name()
)
if failure_keys:
raise GoogleCloudException(f"Failed to delete {failure_keys[0]}")

return True
else:
raise AttributeError("No valid uri")

@backoff.on_exception(backoff.expo, RETRIABLE_EXCEPTIONS, max_tries=10)
async def delete_upload(self, uri):
util = get_utility(IGCloudBlobStore)

if uri is not None:
url = "{}/{}/o/{}".format(
OBJECT_BASE_URL, await util.get_bucket_name(), quote_plus(uri)
Expand Down Expand Up @@ -404,7 +429,7 @@ async def copy(self, to_storage_manager, to_dm):

async def delete(self):
file = self.field.get(self.field.context or self.context)
await self.delete_upload(file.uri)
return await self.delete_by_prefix(file.uri)


@implementer(IGCloudFileField)
Expand Down Expand Up @@ -617,36 +642,37 @@ async def generate_download_signed_url(
request_args["credentials"] = credentials
return blob.generate_signed_url(**request_args)


async def get_blobs(self, page_token: Optional[str] = None, prefix=None, max_keys=1000) -> Tuple[List[BlobMetadata], str]:
async def get_blobs(
self, page_token: Optional[str] = None, prefix=None, max_keys=1000
) -> Tuple[List[BlobMetadata], str]:
"""
Get a page of items from the bucket
"""
page = await self.iterate_bucket_page(page_token, prefix)
blobs = [
BlobMetadata(
name = item.get("name"),
bucket = item.get("bucket"),
createdTime = parse(item.get("timeCreated")),
size = int(item.get("size"))
name=item.get("name"),
bucket=item.get("bucket"),
createdTime=parse(item.get("timeCreated")),
size=int(item.get("size")),
)
for item
in page.get("items", [])
for item in page.get("items", [])
]
next_page_token = page.get("nextPageToken", None)

return blobs, next_page_token


async def delete_blobs(self, keys: List[str], bucket_name: Optional[str] = None) -> Tuple[List[str], List[str]]:
async def delete_blobs(
self, keys: List[str], bucket_name: Optional[str] = None
) -> Tuple[List[str], List[str]]:
"""
Deletes a batch of files. Returns successful and failed keys.
"""
client = self.get_client()

# gs://onna.storage-dev-1173.atlasense.com/onna/43af73f190074ee098a5288174cbdf2c/1fe50b288a5b4eb2916abb4354612bb3::9ecf1bfdc1554db4ab27a2a9e1757852
if not bucket_name:
bucket_name = await self.get_bucket_name()

bucket = client.bucket(bucket_name)

with client.batch(raise_exception=False) as batch:
Expand All @@ -656,15 +682,14 @@ async def delete_blobs(self, keys: List[str], bucket_name: Optional[str] = None)
success_keys = []
failed_keys = []
for idx, response in enumerate(batch._responses):
key=keys[idx]
key = keys[idx]
if 200 <= response.status_code <= 300:
success_keys.append(key)
else:
failed_keys.append(key)

return success_keys, failed_keys


async def delete_bucket(self, bucket_name: Optional[str] = None):
"""
Delete the given bucket
Expand All @@ -675,7 +700,7 @@ async def delete_bucket(self, bucket_name: Optional[str] = None):
bucket_name = await self.get_bucket_name()

bucket = client.bucket(bucket_name)

try:
bucket.delete(force=True)
except ValueError:
Expand Down

0 comments on commit a644e76

Please sign in to comment.