Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(PPS-411): Add bucket selection support for multi part upload #1112

Merged
merged 16 commits into from
Apr 5, 2024
40 changes: 21 additions & 19 deletions fence/blueprints/data/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
BlankIndex,
IndexedFile,
get_signed_url_for_file,
verify_data_upload_bucket_configuration,
)
from fence.config import config
from fence.errors import Forbidden, InternalError, UserError, Unauthorized
Expand Down Expand Up @@ -177,15 +178,7 @@ def upload_data_file():
protocol = params["protocol"] if "protocol" in params else None
bucket = params.get("bucket")
if bucket:
s3_buckets = get_value(
flask.current_app.config,
"ALLOWED_DATA_UPLOAD_BUCKETS",
InternalError("ALLOWED_DATA_UPLOAD_BUCKETS not configured"),
)
if bucket not in s3_buckets:
logger.debug(f"Bucket '{bucket}' not in ALLOWED_DATA_UPLOAD_BUCKETS config")
raise Forbidden(f"Uploading to bucket '{bucket}' is not allowed")

verify_data_upload_bucket_configuration(bucket)
response = {
"guid": blank_index.guid,
"url": blank_index.make_signed_url(
Expand Down Expand Up @@ -224,10 +217,16 @@ def init_multipart_upload():
default=default_expires_in,
)

bucket = params.get("bucket")
if bucket:
verify_data_upload_bucket_configuration(bucket)

response = {
"guid": blank_index.guid,
"uploadId": BlankIndex.init_multipart_upload(
blank_index.guid + "/" + params["file_name"], expires_in=expires_in
blank_index.guid + "/" + params["file_name"],
expires_in=expires_in,
bucket=bucket,
),
}
return flask.jsonify(response), 201
Expand Down Expand Up @@ -256,12 +255,17 @@ def generate_multipart_upload_presigned_url():
default=default_expires_in,
)

bucket = params.get("bucket")
if bucket:
verify_data_upload_bucket_configuration(bucket)

response = {
"presigned_url": BlankIndex.generate_aws_presigned_url_for_part(
params["key"],
params["uploadId"],
params["partNumber"],
expires_in=expires_in,
bucket=bucket,
)
}
return flask.jsonify(response), 200
Expand All @@ -284,6 +288,7 @@ def complete_multipart_upload():
raise UserError("missing required arguments: {}".format(list(missing)))

default_expires_in = flask.current_app.config.get("MAX_PRESIGNED_URL_TTL", 3600)
bucket = params.get("bucket")
Avantol13 marked this conversation as resolved.
Show resolved Hide resolved
expires_in = get_valid_expiration(
params.get("expires_in"),
max_limit=default_expires_in,
Expand All @@ -292,7 +297,11 @@ def complete_multipart_upload():

try:
BlankIndex.complete_multipart_upload(
params["key"], params["uploadId"], params["parts"], expires_in=expires_in
params["key"],
params["uploadId"],
params["parts"],
expires_in=expires_in,
bucket=bucket,
),
except InternalError as e:
return flask.jsonify({"message": e.message}), e.code
Expand All @@ -311,14 +320,7 @@ def upload_file(file_id):

bucket = flask.request.args.get("bucket")
if bucket:
s3_buckets = get_value(
flask.current_app.config,
"ALLOWED_DATA_UPLOAD_BUCKETS",
InternalError("ALLOWED_DATA_UPLOAD_BUCKETS not configured"),
)
if bucket not in s3_buckets:
logger.debug(f"Bucket '{bucket}' not in ALLOWED_DATA_UPLOAD_BUCKETS config")
raise Forbidden(f"Uploading to bucket '{bucket}' is not allowed")
verify_data_upload_bucket_configuration(bucket)

result = get_signed_url_for_file(
"upload", file_id, file_name=file_name, bucket=bucket
Expand Down
49 changes: 23 additions & 26 deletions fence/blueprints/data/indexd.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,9 @@ def make_signed_url(self, file_name, protocol=None, expires_in=None, bucket=None
)
else:
if not bucket:
try:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]

self.logger.debug("Attemping to upload to bucket '{}'".format(bucket))
self.logger.debug("Attempting to upload to bucket '{}'".format(bucket))
s3_url = "s3://{}/{}/{}".format(bucket, self.guid, file_name)
url = S3IndexedFileLocation(s3_url).get_signed_url("upload", expires_in)

Expand All @@ -353,7 +348,7 @@ def make_signed_url(self, file_name, protocol=None, expires_in=None, bucket=None
return url

@staticmethod
def init_multipart_upload(key, expires_in=None):
def init_multipart_upload(key, expires_in=None, bucket=None):
"""
Initilize multipart upload given key

Expand All @@ -363,17 +358,13 @@ def init_multipart_upload(key, expires_in=None):
Returns:
uploadId(str)
"""
try:
if not bucket:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
s3_url = "s3://{}/{}".format(bucket, key)
return S3IndexedFileLocation(s3_url).init_multipart_upload(expires_in)

@staticmethod
def complete_multipart_upload(key, uploadId, parts, expires_in=None):
def complete_multipart_upload(key, uploadId, parts, expires_in=None, bucket=None):
"""
Complete multipart upload

Expand All @@ -386,19 +377,19 @@ def complete_multipart_upload(key, uploadId, parts, expires_in=None):
Returns:
None if success otherwise an exception
"""
try:
if bucket:
verify_data_upload_bucket_configuration(bucket)
else:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
s3_url = "s3://{}/{}".format(bucket, key)
S3IndexedFileLocation(s3_url).complete_multipart_upload(
uploadId, parts, expires_in
)

@staticmethod
def generate_aws_presigned_url_for_part(key, uploadId, partNumber, expires_in):
def generate_aws_presigned_url_for_part(
key, uploadId, partNumber, expires_in, bucket=None
):
"""
Generate presigned url for each part

Expand All @@ -410,12 +401,10 @@ def generate_aws_presigned_url_for_part(key, uploadId, partNumber, expires_in):
Returns:
presigned_url(str)
"""
try:
if bucket:
verify_data_upload_bucket_configuration(bucket)
else:
bucket = flask.current_app.config["DATA_UPLOAD_BUCKET"]
except KeyError:
raise InternalError(
"fence not configured with data upload bucket; can't create signed URL"
)
s3_url = "s3://{}/{}".format(bucket, key)
return S3IndexedFileLocation(s3_url).generate_presigned_url_for_part_upload(
uploadId, partNumber, expires_in
Expand Down Expand Up @@ -1089,7 +1078,7 @@ def init_multipart_upload(self, expires_in):
self.bucket_name(), aws_creds, expires_in
)

return multipart_upload.initilize_multipart_upload(
return multipart_upload.initialize_multipart_upload(
self.parsed_url.netloc, self.parsed_url.path.strip("/"), credentials
)

Expand Down Expand Up @@ -1660,3 +1649,11 @@ def filter_auth_ids(action, list_auth_ids):
if checked_permission in values:
authorized_dbgaps.append(key)
return authorized_dbgaps


def verify_data_upload_bucket_configuration(bucket):
Avantol13 marked this conversation as resolved.
Show resolved Hide resolved
s3_buckets = flask.current_app.config["ALLOWED_DATA_UPLOAD_BUCKETS"]
if bucket not in s3_buckets:
logger.error(f"Bucket '{bucket}' not in ALLOWED_DATA_UPLOAD_BUCKETS config")
logger.debug(f"Buckets configgured in ALLOWED_DATA_UPLOAD_BUCKETS {s3_buckets}")
raise Forbidden(f"Uploading to bucket '{bucket}' is not allowed")
57 changes: 48 additions & 9 deletions fence/blueprints/data/multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
from retry.api import retry_call

from cdispyutils.hmac4 import generate_aws_presigned_url
from cdispyutils.config import get_value
from cdislogging import get_logger
from fence.config import config
from fence.errors import InternalError

MAX_TRIES = 5

logger = get_logger(__name__)


def initilize_multipart_upload(bucket, key, credentials):
def initialize_multipart_upload(bucket_name, key, credentials):
"""
Initialize multipart upload

Expand All @@ -23,17 +25,30 @@ def initilize_multipart_upload(bucket, key, credentials):
Returns:
UploadId(str): uploadId
"""
s3_buckets = get_value(
config, "S3_BUCKETS", InternalError("S3_BUCKETS not configured")
)
bucket = s3_buckets.get(bucket_name)

url = ""
if bucket.get("endpoint_url"):
url = bucket["endpoint_url"]

session = boto3.Session(
aws_access_key_id=credentials["aws_access_key_id"],
aws_secret_access_key=credentials["aws_secret_access_key"],
aws_session_token=credentials.get("aws_session_token"),
)
s3client = session.client("s3")
s3client = None
if url:
s3client = session.client("s3", endpoint_url=url)
else:
s3client = session.client("s3")

try:
multipart_upload = retry_call(
s3client.create_multipart_upload,
fkwargs={"Bucket": bucket, "Key": key},
fkwargs={"Bucket": bucket_name, "Key": key},
tries=MAX_TRIES,
jitter=10,
)
Expand All @@ -48,7 +63,7 @@ def initilize_multipart_upload(bucket, key, credentials):
return multipart_upload.get("UploadId")


def complete_multipart_upload(bucket, key, credentials, uploadId, parts):
def complete_multipart_upload(bucket_name, key, credentials, uploadId, parts):
"""
Complete multipart upload.
Raise exception if something wrong happens; otherwise success
Expand All @@ -64,18 +79,31 @@ def complete_multipart_upload(bucket, key, credentials, uploadId, parts):
Return:
None
"""
s3_buckets = get_value(
config, "S3_BUCKETS", InternalError("S3_BUCKETS not configured")
)
bucket = s3_buckets.get(bucket_name)

url = ""
if bucket.get("endpoint_url"):
url = bucket["endpoint_url"]

session = boto3.Session(
aws_access_key_id=credentials["aws_access_key_id"],
aws_secret_access_key=credentials["aws_secret_access_key"],
aws_session_token=credentials.get("aws_session_token"),
)
s3client = session.client("s3")
s3client = None
if url:
s3client = session.client("s3", endpoint_url=url)
else:
s3client = session.client("s3")

try:
retry_call(
s3client.complete_multipart_upload,
fkwargs={
"Bucket": bucket,
"Bucket": bucket_name,
"Key": key,
"MultipartUpload": {"Parts": parts},
"UploadId": uploadId,
Expand All @@ -95,7 +123,7 @@ def complete_multipart_upload(bucket, key, credentials, uploadId, parts):


def generate_presigned_url_for_uploading_part(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are all positive tests. I know they were here before, but can you add some for the error cases like when DATA_UPLOAD and/or ALLOWED_DATA_UPLOAD_BUCKETS buckets aren't specified / are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like my previous comment might resolve this test case? 🤔

bucket, key, credentials, uploadId, partNumber, region, expires
bucket_name, key, credentials, uploadId, partNumber, region, expires
):
"""
Generate presigned url for uploading object part given uploadId and part number
Expand All @@ -113,13 +141,24 @@ def generate_presigned_url_for_uploading_part(
presigned_url(str)
"""

url = "https://{}.s3.amazonaws.com/{}".format(bucket, key)
s3_buckets = get_value(
config, "S3_BUCKETS", InternalError("S3_BUCKETS not configured")
)
bucket = s3_buckets.get(bucket_name)

if bucket.get("endpoint_url"):
url = bucket["endpoint_url"].strip("/") + "/{}/{}".format(
bucket_name, key.strip("/")
)
else:
url = "https://{}.s3.amazonaws.com/{}".format(bucket_name, key)
additional_signed_qs = {"partNumber": str(partNumber), "uploadId": uploadId}

try:
return generate_aws_presigned_url(
presigned_url = generate_aws_presigned_url(
url, "PUT", credentials, "s3", region, expires, additional_signed_qs
)
return presigned_url
except Exception as e:
raise InternalError(
"Can not generate presigned url for part number {} of key {}. Detail {}".format(
Expand Down
1 change: 1 addition & 0 deletions fence/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def post_process(self):
"CIRRUS_CFG",
"WHITE_LISTED_GOOGLE_PARENT_ORGS",
"CLIENT_CREDENTIALS_ON_DOWNLOAD_ENABLED",
"DATA_UPLOAD_BUCKET",
]
for default in defaults:
self.force_default_if_none(default, default_cfg=default_config)
Expand Down
13 changes: 13 additions & 0 deletions openapis/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1717,10 +1717,15 @@ components:
type: string
description: requested authorization resources to be set on the
resulting indexed record. You must have proper authorization to set this
bucket:
type: string
required: false
description: bucket to upload to
example:
file_name: "my_file.bam"
expires_in: 1200
authz: ["/programs/A"]
bucket: "bucket-1"
RequestMultipartUpload:
type: object
required:
Expand All @@ -1738,6 +1743,10 @@ components:
expires_in:
type: integer
description: optional integer specifying the presigned URL lifetime
bucket:
type: string
required: false
description: bucket to upload\ to
CompleteMultipartUpload:
type: object
required:
Expand Down Expand Up @@ -1767,6 +1776,10 @@ components:
expires_in:
type: integer
description: optional integer specifying the presigned URL lifetime
bucket:
type: string
required: false
description: bucket to upload to
CredentialsSource:
type: object
required:
Expand Down
Loading
Loading