Skip to content

Commit

Permalink
Updating to implement new storage manager interface
Browse files Browse the repository at this point in the history
  • Loading branch information
kyle-widmann committed May 13, 2024
1 parent 6a34207 commit 7eb4354
Showing 1 changed file with 71 additions and 2 deletions.
73 changes: 71 additions & 2 deletions guillotina_gcloudstorage/storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import traceback
from async_lru import alru_cache
from datetime import datetime
from datetime import timedelta
Expand All @@ -24,11 +25,12 @@
from guillotina.utils import get_authenticated_user_id
from guillotina.utils import get_current_request
from guillotina.utils import to_str
from guillotina.interfaces.files import ICloudBlob
from guillotina_gcloudstorage.interfaces import IGCloudBlobStore
from guillotina_gcloudstorage.interfaces import IGCloudFile
from guillotina_gcloudstorage.interfaces import IGCloudFileField
from oauth2client.service_account import ServiceAccountCredentials
from typing import AsyncIterator
from typing import AsyncIterator, List, Optional, Tuple
from urllib.parse import quote_plus
from zope.interface import implementer

Expand Down Expand Up @@ -102,10 +104,16 @@ def dictfile_converter(value, field):
return GCloudFile(**value)


@implementer(IGCloudFile)
@implementer(IGCloudFile, ICloudBlob)
class GCloudFile(BaseCloudFile):
"""File stored in a GCloud, with a filename."""

def __init__(self, key: str, bucket: str, size: int, createdTime: Optional[datetime]):
self.key = key
self.bucket = bucket
self.size = size
self.createdTime = createdTime


def _is_uploaded_file(file):
return file is not None and isinstance(file, GCloudFile) and file.uri is not None
Expand Down Expand Up @@ -610,3 +618,64 @@ async def generate_download_signed_url(
if credentials:
request_args["credentials"] = credentials
return blob.generate_signed_url(**request_args)


async def get_blobs(self, page_token: Optional[str] = None, prefix=None, max_keys=1000) -> Tuple[List[ICloudBlob], str]:
"""
Get a page of items from the bucket
"""
page = await self.iterate_bucket_page(page_token, prefix)
blobs = [
GCloudFile(
key = item.get("name"),
bucket = item.get("bucket"),
createdTime = item.get("timeCreated"),
size = item.get("size")
)
for item
in page.get("items", [])
]
next_page_token = page.get("nextPageToken")

return blobs, next_page_token


async def delete_blobs(self, keys: List[str], bucket_name: Optional[str] = None) -> Tuple[List[str], List[str]]:
"""
Deletes a batch of files. Returns successful and failed keys.
"""
client = self.get_client()

if not bucket_name:
bucket_name = await self.get_bucket_name()

bucket = client.bucket(bucket_name)

with client.batch(raise_exception=False) as batch:
for key in keys:
bucket.delete_blob(key)

success_keys = []
failed_keys = []
# for response in batch._responses:
# # key = response
# key=1
# if 200 <= response.status_code <= 300:
# success_keys.append(key)
# else:
# failed_keys.append(key)

return success_keys, failed_keys


async def delete_bucket(self, bucket_name: Optional[str] = None):
"""
Delete the given bucket
"""
client = self.get_client()

if not bucket_name:
bucket_name = await self.get_bucket_name()

bucket = client.bucket(bucket_name)
bucket.delete(force=True)

0 comments on commit 7eb4354

Please sign in to comment.