Skip to content

Commit

Permalink
(PPS-411): Add bucket selection support for multi part upload (#123)
Browse files Browse the repository at this point in the history
* Add bucket param for multipart upload

* Resolve comments

* Add bucket param for single file upload

* add support for upload-single

* Get upload-multiple + multipart working

* Fix small files not working with multipart

* Fix upload

* chore(comment): insignificant commit to force test reruns

* Refactor upload-multiple

* Use file_name provided in manifest (#125)

* Use file_name provided in manifest

* Use file_name provided in manifest

* Ensure file_name used in url

* Ensure bucket passed to GeneratePresignedURL

* Fix batch

* fix utils

* fix utils.go

* cleanup+remove validateFilePath

---------

Co-authored-by: Alexander VanTol <Avantol13@users.noreply.github.com>
Co-authored-by: Brian <brian@bwalsh.com>
  • Loading branch information
3 people authored Jan 22, 2024
1 parent 6897add commit b9b3605
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 114 deletions.
2 changes: 2 additions & 0 deletions gen3-client/commonUtils/commonUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type FileUploadRequestObject struct {
PresignedURL string
Request *http.Request
Bar *pb.ProgressBar
Bucket string `json:"bucket,omitempty"`
}

// FileDownloadResponseObject defines a object for file download
Expand Down Expand Up @@ -105,6 +106,7 @@ type RetryObject struct {
GUID string
RetryCount int
Multipart bool
Bucket string
}

// ParseRootPath parses dirname that has "~" in the beginning
Expand Down
5 changes: 3 additions & 2 deletions gen3-client/g3cmd/retry-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {
var guid string
var presignedURL string
var err error

fmt.Println()
if len(failedLogMap) == 0 {
log.Println("No failed file in log, no need to retry upload.")
Expand Down Expand Up @@ -96,7 +97,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {

if ro.Multipart {
fileInfo := FileInfo{FilePath: ro.FilePath, Filename: ro.Filename}
err = multipartUpload(gen3Interface, fileInfo, ro.RetryCount)
err = multipartUpload(gen3Interface, fileInfo, ro.RetryCount, ro.Bucket)
if err != nil {
updateRetryObject(&ro, ro.FilePath, ro.Filename, ro.FileMetadata, ro.GUID, ro.RetryCount, true)
handleFailedRetry(ro, retryObjCh, err, true)
Expand All @@ -109,7 +110,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {
}
}
} else {
presignedURL, guid, err = GeneratePresignedURL(gen3Interface, ro.Filename, ro.FileMetadata)
presignedURL, guid, err = GeneratePresignedURL(gen3Interface, ro.Filename, ro.FileMetadata, ro.Bucket)
if err != nil {
updateRetryObject(&ro, ro.FilePath, ro.Filename, ro.FileMetadata, guid, ro.RetryCount, false)
handleFailedRetry(ro, retryObjCh, err, true)
Expand Down
8 changes: 4 additions & 4 deletions gen3-client/g3cmd/upload-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func retry(attempts int, filePath string, guid string, f func() error) (err erro
return fmt.Errorf("After %d attempts, last error: %s", attempts, err)
}

func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error {
func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int, bucketName string) error {
// NOTE @mpingram -- multipartUpload does not yet use the new Shepherd API
// because Shepherd does not yet support multipart uploads.
file, err := os.Open(fileInfo.FilePath)
Expand All @@ -61,7 +61,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
return err
}

uploadID, guid, err := InitMultipartUpload(g3, fileInfo.Filename)
uploadID, guid, err := InitMultipartUpload(g3, fileInfo.Filename, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, retryCount, true, true)
err = fmt.Errorf("FAILED multipart upload for %s: %s", fileInfo.Filename, err.Error())
Expand All @@ -85,7 +85,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
for chunkIndex := range chunkIndexCh {
var presignedURL string
err = retry(MaxRetryCount, fileInfo.FilePath, guid, func() (err error) {
presignedURL, err = GenerateMultipartPresignedURL(g3, key, uploadID, chunkIndex)
presignedURL, err = GenerateMultipartPresignedURL(g3, key, uploadID, chunkIndex, bucketName)
return
})
if err != nil {
Expand Down Expand Up @@ -165,7 +165,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
return parts[i].PartNumber < parts[j].PartNumber // sort parts in ascending order
})

if err = CompleteMultipartUpload(g3, key, uploadID, parts); err != nil {
if err = CompleteMultipartUpload(g3, key, uploadID, parts, bucketName); err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, retryCount, true, true)
err = fmt.Errorf("FAILED multipart upload for %s: %s", fileInfo.Filename, err.Error())
return err
Expand Down
169 changes: 126 additions & 43 deletions gen3-client/g3cmd/upload-multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
Expand All @@ -17,20 +16,19 @@ import (
)

func init() {
var bucketName string
var manifestPath string
var uploadPath string
var batch bool
var numParallel int
var workers int
var respCh chan *http.Response
var errCh chan error
var batchFURObjects []commonUtils.FileUploadRequestObject
var forceMultipart bool
var includeSubDirName bool

var uploadMultipleCmd = &cobra.Command{
Use: "upload-multiple",
Short: "Upload multiple of files from a specified manifest",
Long: `Get presigned URLs for multiple of files specified in a manifest file and then upload all of them.`,
Example: `./gen3-client upload-multiple --profile=<profile-name> --manifest=<path-to-manifest/manifest.json> --upload-path=<path-to-file-dir/>`,
Long: `Get presigned URLs for multiple of files specified in a manifest file and then upload all of them. Options to run multipart uploads for large files and running multiple workers to batch upload available.`,
Example: `./gen3-client upload-multiple --profile=<profile-name> --manifest=<path-to-manifest/manifest.json> --upload-path=<path-to-file-dir/> --bucket=<bucket-name> --force-multipart=<boolean> --include-subdirname=<boolean> --batch=<boolean>`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("Notice: this is the upload method which requires the user to provide GUIDs. In this method files will be uploaded to specified GUIDs.\nIf your intention is to upload files without pre-existing GUIDs, consider to use \"./gen3-client upload\" instead.\n\n")

Expand Down Expand Up @@ -76,51 +74,54 @@ func init() {
log.Fatalln("A valid manifest can be acquired by using the \"Download Manifest\" button on " + dataExplorerURL)
}

furObjects := validateObject(objects, uploadPath)
uploadPath, err := commonUtils.GetAbsolutePath(uploadPath)
if err != nil {
log.Fatalf("Error when parsing file paths: " + err.Error())
}

if batch {
workers, respCh, errCh, batchFURObjects = initBatchUploadChannels(numParallel, len(objects))
filePaths := make([]string, 0)
for _, object := range objects {
// Here we are assuming the local filename will be the same as GUID
filePath, err := getFullFilePath(uploadPath, object.ObjectID)
if err != nil {
log.Println(err.Error())
continue
}
filePaths = append(filePaths, filePath)
}

for i, furObject := range furObjects {
if batch {
if len(batchFURObjects) < workers {
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
batchFURObjects = append(batchFURObjects, furObject)
}
if i == len(furObjects)-1 { // upload remainders
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
}
} else {
file, err := os.Open(furObject.FilePath)
if err != nil {
log.Println("File open error: " + err.Error())
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
continue
}
defer file.Close()
singlePartFilePaths, multipartFilePaths := separateSingleAndMultipartUploads(filePaths, forceMultipart)

furObject, err := GenerateUploadRequest(gen3Interface, furObject, file)
if batch {
workers, respCh, errCh, batchFURObjects := initBatchUploadChannels(numParallel, len(singlePartFilePaths))
for i, filePath := range singlePartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
file.Close()
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
log.Printf("Error occurred during request generation: %s", err.Error())
continue
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error: " + err.Error())
return
}
err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
logs.IncrementScore(logs.ScoreBoardLen - 1)
if len(batchFURObjects) < workers {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject) //nolint:ineffassign
} else {
logs.IncrementScore(0)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject) //nolint:ineffassign
}
if !forceMultipart && i == len(singlePartFilePaths)-1 { // upload remainders
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
}
file.Close()
}
} else {
processSingleUploads(gen3Interface, singlePartFilePaths, bucketName, includeSubDirName, uploadPath)
}
if len(multipartFilePaths) > 0 {
processMultipartUpload(gen3Interface, multipartFilePaths, bucketName, includeSubDirName, uploadPath)
}
if !logs.IsFailedLogMapEmpty() {
retryUpload(logs.GetFailedLogMap())
}
logs.PrintScoreBoard()
logs.CloseAll()
Expand All @@ -135,5 +136,87 @@ func init() {
uploadMultipleCmd.MarkFlagRequired("upload-path") //nolint:errcheck
uploadMultipleCmd.Flags().BoolVar(&batch, "batch", true, "Upload in parallel")
uploadMultipleCmd.Flags().IntVar(&numParallel, "numparallel", 3, "Number of uploads to run in parallel")
uploadMultipleCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded. If not provided, defaults to Gen3's configured DATA_UPLOAD_BUCKET.")
uploadMultipleCmd.Flags().BoolVar(&forceMultipart, "force-multipart", false, "Force to use multipart upload when possible (file size >= 5MB)")
uploadMultipleCmd.Flags().BoolVar(&includeSubDirName, "include-subdirname", false, "Include subdirectory names in file name")
RootCmd.AddCommand(uploadMultipleCmd)
}

func processSingleUploads(gen3Interface Gen3Interface, singleFilePaths []string, bucketName string, includeSubDirName bool, uploadPath string) {
for _, filePath := range singleFilePaths {
file, err := os.Open(filePath)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("File open error: " + err.Error())
continue
}

startSingleFileUpload(gen3Interface, filePath, file, bucketName, includeSubDirName, uploadPath)
file.Close()
}
}

func startSingleFileUpload(gen3Interface Gen3Interface, filePath string, file *os.File, bucketName string, includeSubDirName bool, uploadPath string) {
fi, err := file.Stat()
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("File stat error for file" + fi.Name() + ", file may be missing or unreadable because of permissions.\n")
return
}

fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
return
}

respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)
log.Println(err.Error())
return
}

logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)

furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, GUID: guid, PresignedURL: respURL}
furObject, err = GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
file.Close()
log.Printf("Error occurred during request generation: %s\n", err.Error())
return
}

err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
file.Close()
}

func processMultipartUpload(gen3Interface Gen3Interface, multipartFilePaths []string, bucketName string, includeSubDirName bool, uploadPath string) {
profileConfig := conf.ParseConfig(profile)
if profileConfig.UseShepherd == "true" ||
profileConfig.UseShepherd == "" && commonUtils.DefaultUseShepherd == true {
log.Fatalf("Error: Shepherd currently does not support multipart uploads. For the moment, please disable Shepherd with\n $ gen3-client configure --profile=%v --use-shepherd=false\nand try again.\n", profile)
}
log.Println("Multipart uploading....")

for _, filePath := range multipartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
continue
}
err = multipartUpload(gen3Interface, fileInfo, 0, bucketName)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
}
}
4 changes: 3 additions & 1 deletion gen3-client/g3cmd/upload-single.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
func init() {
var guid string
var filePath string
var bucketName string

var uploadSingleCmd = &cobra.Command{
Use: "upload-single",
Expand Down Expand Up @@ -65,7 +66,7 @@ func init() {
}
defer file.Close()

furObject := commonUtils.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid}
furObject := commonUtils.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid, Bucket: bucketName}

furObject, err = GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
Expand Down Expand Up @@ -94,5 +95,6 @@ func init() {
uploadSingleCmd.MarkFlagRequired("guid") //nolint:errcheck
uploadSingleCmd.Flags().StringVar(&filePath, "file", "", "Specify file to upload to with --file=~/path/to/file")
uploadSingleCmd.MarkFlagRequired("file") //nolint:errcheck
uploadSingleCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded. If not provided, defaults to Gen3's configured DATA_UPLOAD_BUCKET.")
RootCmd.AddCommand(uploadSingleCmd)
}
31 changes: 7 additions & 24 deletions gen3-client/g3cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

func init() {
var bucketName string
var includeSubDirName bool
var uploadPath string
var batch bool
Expand Down Expand Up @@ -70,7 +71,7 @@ func init() {
}
fmt.Println()

singlepartFilePaths, multipartFilePaths := validateFilePath(filePaths, forceMultipart)
singlepartFilePaths, multipartFilePaths := separateSingleAndMultipartUploads(filePaths, forceMultipart)

if batch {
workers, respCh, errCh, batchFURObjects := initBatchUploadChannels(numParallel, len(singlepartFilePaths))
Expand All @@ -85,13 +86,13 @@ func init() {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
}
}
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)

if len(errCh) > 0 {
close(errCh)
Expand Down Expand Up @@ -123,7 +124,7 @@ func init() {
continue
}
// The following flow is for singlepart upload flow
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata)
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)
log.Println(err.Error())
Expand Down Expand Up @@ -153,26 +154,7 @@ func init() {
if len(multipartFilePaths) > 0 {
// NOTE(@mpingram) - For the moment Shepherd doesn't support multipart uploads.
// Throw an error if Shepherd is enabled and user attempts to multipart upload.
profileConfig := conf.ParseConfig(profile)
if profileConfig.UseShepherd == "true" ||
profileConfig.UseShepherd == "" && commonUtils.DefaultUseShepherd == true {
log.Fatalf("Error: Shepherd currently does not support multipart uploads. For the moment, please disable Shepherd with\n $ gen3-client configure --profile=%v --use-shepherd=false\nand try again.\n", profile)
}
log.Println("Multipart uploading....")
for _, filePath := range multipartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
continue
}
err = multipartUpload(gen3Interface, fileInfo, 0)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
}
processMultipartUpload(gen3Interface, multipartFilePaths, bucketName, includeSubDirName, uploadPath)
}

if !logs.IsFailedLogMapEmpty() {
Expand All @@ -192,5 +174,6 @@ func init() {
uploadCmd.Flags().BoolVar(&includeSubDirName, "include-subdirname", false, "Include subdirectory names in file name")
uploadCmd.Flags().BoolVar(&forceMultipart, "force-multipart", false, "Force to use multipart upload if possible")
uploadCmd.Flags().BoolVar(&hasMetadata, "metadata", false, "Search for and upload file metadata alongside the file")
uploadCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded. If not provided, defaults to Gen3's configured DATA_UPLOAD_BUCKET.")
RootCmd.AddCommand(uploadCmd)
}
Loading

0 comments on commit b9b3605

Please sign in to comment.