Skip to content

Commit

Permalink
Merge pull request #1112 from uc-cdis/feat/multi_part_bucket_param
Browse files Browse the repository at this point in the history
(PPS-411): Add bucket selection support for multi part upload
  • Loading branch information
BinamB authored Apr 5, 2024
2 parents d7d277e + 5cc14c1 commit 558ac4e
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 68 deletions.
40 changes: 21 additions & 19 deletions fence/blueprints/data/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
BlankIndex,
IndexedFile,
get_signed_url_for_file,
verify_data_upload_bucket_configuration,
)
from fence.config import config
from fence.errors import Forbidden, InternalError, UserError, Unauthorized
Expand Down Expand Up @@ -177,15 +178,7 @@ def upload_data_file():
protocol = params["protocol"] if "protocol" in params else None
bucket = params.get("bucket")
if bucket:
s3_buckets = get_value(
flask.current_app.config,
"ALLOWED_DATA_UPLOAD_BUCKETS",
InternalError("ALLOWED_DATA_UPLOAD_BUCKETS not configured"),
)
if bucket not in s3_buckets:
logger.debug(f"Bucket '{bucket}' not in ALLOWED_DATA_UPLOAD_BUCKETS config")
raise Forbidden(f"Uploading to bucket '{bucket}' is not allowed")

verify_data_upload_bucket_configuration(bucket)
response = {
"guid": blank_index.guid,
"url": blank_index.make_signed_url(
Expand Down Expand Up @@ -224,10 +217,16 @@ def init_multipart_upload():
default=default_expires_in,
)

bucket = params.get("bucket")
if bucket:
verify_data_upload_bucket_configuration(bucket)

response = {
"guid": blank_index.guid,
"uploadId": BlankIndex.init_multipart_upload(
blank_index.guid + "/" + params["file_name"], expires_in=expires_in
blank_index.guid + "/" + params["file_name"],
expires_in=expires_in,
bucket=bucket,
),
}
return flask.jsonify(response), 201
Expand Down Expand Up @@ -256,12 +255,17 @@ def generate_multipart_upload_presigned_url():
default=default_expires_in,
)

bucket = params.get("bucket")
if bucket:
verify_data_upload_bucket_configuration(bucket)

response = {
"presigned_url": BlankIndex.generate_aws_presigned_url_for_part(
params["key"],
params["uploadId"],
params["partNumber"],
expires_in=expires_in,
bucket=bucket,
)
}
return flask.jsonify(response), 200
Expand All @@ -284,6 +288,7 @@ def complete_multipart_upload():
raise UserError("missing required arguments: {}".format(list(missing)))

default_expires_in = flask.current_app.config.get("MAX_PRESIGNED_URL_TTL", 3600)
bucket = params.get("bucket")
expires_in = get_valid_expiration(
params.get("expires_in"),
max_limit=default_expires_in,
Expand All @@ -292,7 +297,11 @@ def complete_multipart_upload():

try:
BlankIndex.complete_multipart_upload(
params["key"], params["uploadId"], params["parts"], expires_in=expires_in
params["key"],
params["uploadId"],
params["parts"],
expires_in=expires_in,
bucket=bucket,
),
except InternalError as e:
return flask.jsonify({"message": e.message}), e.code
Expand All @@ -311,14 +320,7 @@ def upload_file(file_id):

bucket = flask.request.args.get("bucket")
if bucket:
s3_buckets = get_value(
flask.current_app.config,
"ALLOWED_DATA_UPLOAD_BUCKETS",
InternalError("ALLOWED_DATA_UPLOAD_BUCKETS not configured"),
)
if bucket not in s3_buckets:
logger.debug(f"Bucket '{bucket}' not in ALLOWED_DATA_UPLOAD_BUCKETS config")
raise Forbidden(f"Uploading to bucket '{bucket}' is not allowed")
verify_data_upload_bucket_configuration(bucket)

result = get_signed_url_for_file(
"upload", file_id, file_name=file_name, bucket=bucket
Expand Down
49 changes: 23 additions & 26 deletions fence/blueprints/data/indexd.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,9 @@ def make_signed_url(self, file_name, protocol=None, expires_in=None, bucket=None
)
else:
if not bucket:
try:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]

self.logger.debug("Attemping to upload to bucket '{}'".format(bucket))
self.logger.debug("Attempting to upload to bucket '{}'".format(bucket))
s3_url = "s3://{}/{}/{}".format(bucket, self.guid, file_name)
url = S3IndexedFileLocation(s3_url).get_signed_url("upload", expires_in)

Expand All @@ -353,7 +348,7 @@ def make_signed_url(self, file_name, protocol=None, expires_in=None, bucket=None
return url

@staticmethod
def init_multipart_upload(key, expires_in=None):
def init_multipart_upload(key, expires_in=None, bucket=None):
"""
Initilize multipart upload given key
Expand All @@ -363,17 +358,13 @@ def init_multipart_upload(key, expires_in=None):
Returns:
uploadId(str)
"""
try:
if not bucket:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
s3_url = "s3://{}/{}".format(bucket, key)
return S3IndexedFileLocation(s3_url).init_multipart_upload(expires_in)

@staticmethod
def complete_multipart_upload(key, uploadId, parts, expires_in=None):
def complete_multipart_upload(key, uploadId, parts, expires_in=None, bucket=None):
"""
Complete multipart upload
Expand All @@ -386,19 +377,19 @@ def complete_multipart_upload(key, uploadId, parts, expires_in=None):
Returns:
None if success otherwise an exception
"""
try:
if bucket:
verify_data_upload_bucket_configuration(bucket)
else:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
s3_url = "s3://{}/{}".format(bucket, key)
S3IndexedFileLocation(s3_url).complete_multipart_upload(
uploadId, parts, expires_in
)

@staticmethod
def generate_aws_presigned_url_for_part(key, uploadId, partNumber, expires_in):
def generate_aws_presigned_url_for_part(
key, uploadId, partNumber, expires_in, bucket=None
):
"""
Generate presigned url for each part
Expand All @@ -410,12 +401,10 @@ def generate_aws_presigned_url_for_part(key, uploadId, partNumber, expires_in):
Returns:
presigned_url(str)
"""
try:
if bucket:
verify_data_upload_bucket_configuration(bucket)
else:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
s3_url = "s3://{}/{}".format(bucket, key)
return S3IndexedFileLocation(s3_url).generate_presigned_url_for_part_upload(
uploadId, partNumber, expires_in
Expand Down Expand Up @@ -1089,7 +1078,7 @@ def init_multipart_upload(self, expires_in):
self.bucket_name(), aws_creds, expires_in
)

return multipart_upload.initilize_multipart_upload(
return multipart_upload.initialize_multipart_upload(
self.parsed_url.netloc, self.parsed_url.path.strip("/"), credentials
)

Expand Down Expand Up @@ -1660,3 +1649,11 @@ def filter_auth_ids(action, list_auth_ids):
if checked_permission in values:
authorized_dbgaps.append(key)
return authorized_dbgaps


def verify_data_upload_bucket_configuration(bucket):
s3_buckets = flask.current_app.config["ALLOWED_DATA_UPLOAD_BUCKETS"]
if bucket not in s3_buckets:
logger.error(f"Bucket '{bucket}' not in ALLOWED_DATA_UPLOAD_BUCKETS config")
logger.debug(f"Buckets configgured in ALLOWED_DATA_UPLOAD_BUCKETS {s3_buckets}")
raise Forbidden(f"Uploading to bucket '{bucket}' is not allowed")
57 changes: 48 additions & 9 deletions fence/blueprints/data/multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
from retry.api import retry_call

from cdispyutils.hmac4 import generate_aws_presigned_url
from cdispyutils.config import get_value
from cdislogging import get_logger
from fence.config import config
from fence.errors import InternalError

MAX_TRIES = 5

logger = get_logger(__name__)


def initilize_multipart_upload(bucket, key, credentials):
def initialize_multipart_upload(bucket_name, key, credentials):
"""
Initialize multipart upload
Expand All @@ -23,17 +25,30 @@ def initilize_multipart_upload(bucket, key, credentials):
Returns:
UploadId(str): uploadId
"""
s3_buckets = get_value(
config, "S3_BUCKETS", InternalError("S3_BUCKETS not configured")
)
bucket = s3_buckets.get(bucket_name)

url = ""
if bucket.get("endpoint_url"):
url = bucket["endpoint_url"]

session = boto3.Session(
aws_access_key_id=credentials["aws_access_key_id"],
aws_secret_access_key=credentials["aws_secret_access_key"],
aws_session_token=credentials.get("aws_session_token"),
)
s3client = session.client("s3")
s3client = None
if url:
s3client = session.client("s3", endpoint_url=url)
else:
s3client = session.client("s3")

try:
multipart_upload = retry_call(
s3client.create_multipart_upload,
fkwargs={"Bucket": bucket, "Key": key},
fkwargs={"Bucket": bucket_name, "Key": key},
tries=MAX_TRIES,
jitter=10,
)
Expand All @@ -48,7 +63,7 @@ def initilize_multipart_upload(bucket, key, credentials):
return multipart_upload.get("UploadId")


def complete_multipart_upload(bucket, key, credentials, uploadId, parts):
def complete_multipart_upload(bucket_name, key, credentials, uploadId, parts):
"""
Complete multipart upload.
Raise exception if something wrong happens; otherwise success
Expand All @@ -64,18 +79,31 @@ def complete_multipart_upload(bucket, key, credentials, uploadId, parts):
Return:
None
"""
s3_buckets = get_value(
config, "S3_BUCKETS", InternalError("S3_BUCKETS not configured")
)
bucket = s3_buckets.get(bucket_name)

url = ""
if bucket.get("endpoint_url"):
url = bucket["endpoint_url"]

session = boto3.Session(
aws_access_key_id=credentials["aws_access_key_id"],
aws_secret_access_key=credentials["aws_secret_access_key"],
aws_session_token=credentials.get("aws_session_token"),
)
s3client = session.client("s3")
s3client = None
if url:
s3client = session.client("s3", endpoint_url=url)
else:
s3client = session.client("s3")

try:
retry_call(
s3client.complete_multipart_upload,
fkwargs={
"Bucket": bucket,
"Bucket": bucket_name,
"Key": key,
"MultipartUpload": {"Parts": parts},
"UploadId": uploadId,
Expand All @@ -95,7 +123,7 @@ def complete_multipart_upload(bucket, key, credentials, uploadId, parts):


def generate_presigned_url_for_uploading_part(
bucket, key, credentials, uploadId, partNumber, region, expires
bucket_name, key, credentials, uploadId, partNumber, region, expires
):
"""
Generate presigned url for uploading object part given uploadId and part number
Expand All @@ -113,13 +141,24 @@ def generate_presigned_url_for_uploading_part(
presigned_url(str)
"""

url = "https://{}.s3.amazonaws.com/{}".format(bucket, key)
s3_buckets = get_value(
config, "S3_BUCKETS", InternalError("S3_BUCKETS not configured")
)
bucket = s3_buckets.get(bucket_name)

if bucket.get("endpoint_url"):
url = bucket["endpoint_url"].strip("/") + "/{}/{}".format(
bucket_name, key.strip("/")
)
else:
url = "https://{}.s3.amazonaws.com/{}".format(bucket_name, key)
additional_signed_qs = {"partNumber": str(partNumber), "uploadId": uploadId}

try:
return generate_aws_presigned_url(
presigned_url = generate_aws_presigned_url(
url, "PUT", credentials, "s3", region, expires, additional_signed_qs
)
return presigned_url
except Exception as e:
raise InternalError(
"Can not generate presigned url for part number {} of key {}. Detail {}".format(
Expand Down
1 change: 1 addition & 0 deletions fence/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def post_process(self):
"CIRRUS_CFG",
"WHITE_LISTED_GOOGLE_PARENT_ORGS",
"CLIENT_CREDENTIALS_ON_DOWNLOAD_ENABLED",
"DATA_UPLOAD_BUCKET",
]
for default in defaults:
self.force_default_if_none(default, default_cfg=default_config)
Expand Down
13 changes: 13 additions & 0 deletions openapis/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1717,10 +1717,15 @@ components:
type: string
description: requested authorization resources to be set on the
resulting indexed record. You must have proper authorization to set this
bucket:
type: string
required: false
description: bucket to upload to
example:
file_name: "my_file.bam"
expires_in: 1200
authz: ["/programs/A"]
bucket: "bucket-1"
RequestMultipartUpload:
type: object
required:
Expand All @@ -1738,6 +1743,10 @@ components:
expires_in:
type: integer
description: optional integer specifying the presigned URL lifetime
bucket:
type: string
required: false
description: bucket to upload\ to
CompleteMultipartUpload:
type: object
required:
Expand Down Expand Up @@ -1767,6 +1776,10 @@ components:
expires_in:
type: integer
description: optional integer specifying the presigned URL lifetime
bucket:
type: string
required: false
description: bucket to upload to
CredentialsSource:
type: object
required:
Expand Down
Loading

0 comments on commit 558ac4e

Please sign in to comment.