Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(PPS-411): Add bucket selection support for multi part upload #123

Merged
merged 17 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gen3-client/commonUtils/commonUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type FileUploadRequestObject struct {
PresignedURL string
Request *http.Request
Bar *pb.ProgressBar
Bucket string `json:"bucket,omitempty"`
}

// FileDownloadResponseObject defines a object for file download
Expand Down Expand Up @@ -105,6 +106,7 @@ type RetryObject struct {
GUID string
RetryCount int
Multipart bool
Bucket string
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
}

// ParseRootPath parses dirname that has "~" in the beginning
Expand Down
5 changes: 3 additions & 2 deletions gen3-client/g3cmd/retry-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {
var guid string
var presignedURL string
var err error

fmt.Println()
if len(failedLogMap) == 0 {
log.Println("No failed file in log, no need to retry upload.")
Expand Down Expand Up @@ -96,7 +97,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {

if ro.Multipart {
fileInfo := FileInfo{FilePath: ro.FilePath, Filename: ro.Filename}
err = multipartUpload(gen3Interface, fileInfo, ro.RetryCount)
err = multipartUpload(gen3Interface, fileInfo, ro.RetryCount, ro.Bucket)
if err != nil {
updateRetryObject(&ro, ro.FilePath, ro.Filename, ro.FileMetadata, ro.GUID, ro.RetryCount, true)
handleFailedRetry(ro, retryObjCh, err, true)
Expand All @@ -109,7 +110,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {
}
}
} else {
presignedURL, guid, err = GeneratePresignedURL(gen3Interface, ro.Filename, ro.FileMetadata)
presignedURL, guid, err = GeneratePresignedURL(gen3Interface, ro.Filename, ro.FileMetadata, ro.Bucket)
if err != nil {
updateRetryObject(&ro, ro.FilePath, ro.Filename, ro.FileMetadata, guid, ro.RetryCount, false)
handleFailedRetry(ro, retryObjCh, err, true)
Expand Down
8 changes: 4 additions & 4 deletions gen3-client/g3cmd/upload-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func retry(attempts int, filePath string, guid string, f func() error) (err erro
return fmt.Errorf("After %d attempts, last error: %s", attempts, err)
}

func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error {
func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int, bucketName string) error {
// NOTE @mpingram -- multipartUpload does not yet use the new Shepherd API
// because Shepherd does not yet support multipart uploads.
file, err := os.Open(fileInfo.FilePath)
Expand All @@ -61,7 +61,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
return err
}

uploadID, guid, err := InitMultipartUpload(g3, fileInfo.Filename)
uploadID, guid, err := InitMultipartUpload(g3, fileInfo.Filename, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, retryCount, true, true)
err = fmt.Errorf("FAILED multipart upload for %s: %s", fileInfo.Filename, err.Error())
Expand All @@ -85,7 +85,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
for chunkIndex := range chunkIndexCh {
var presignedURL string
err = retry(MaxRetryCount, fileInfo.FilePath, guid, func() (err error) {
presignedURL, err = GenerateMultipartPresignedURL(g3, key, uploadID, chunkIndex)
presignedURL, err = GenerateMultipartPresignedURL(g3, key, uploadID, chunkIndex, bucketName)
return
})
if err != nil {
Expand Down Expand Up @@ -165,7 +165,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
return parts[i].PartNumber < parts[j].PartNumber // sort parts in ascending order
})

if err = CompleteMultipartUpload(g3, key, uploadID, parts); err != nil {
if err = CompleteMultipartUpload(g3, key, uploadID, parts, bucketName); err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, retryCount, true, true)
err = fmt.Errorf("FAILED multipart upload for %s: %s", fileInfo.Filename, err.Error())
return err
Expand Down
169 changes: 126 additions & 43 deletions gen3-client/g3cmd/upload-multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
Expand All @@ -17,20 +16,19 @@ import (
)

func init() {
var bucketName string
var manifestPath string
var uploadPath string
var batch bool
var numParallel int
var workers int
var respCh chan *http.Response
var errCh chan error
var batchFURObjects []commonUtils.FileUploadRequestObject
var forceMultipart bool
var includeSubDirName bool
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved

var uploadMultipleCmd = &cobra.Command{
Use: "upload-multiple",
Short: "Upload multiple of files from a specified manifest",
Long: `Get presigned URLs for multiple of files specified in a manifest file and then upload all of them.`,
Example: `./gen3-client upload-multiple --profile=<profile-name> --manifest=<path-to-manifest/manifest.json> --upload-path=<path-to-file-dir/>`,
Long: `Get presigned URLs for multiple of files specified in a manifest file and then upload all of them. Options to run multipart uploads for large files and running multiple workers to batch upload available.`,
Example: `./gen3-client upload-multiple --profile=<profile-name> --manifest=<path-to-manifest/manifest.json> --upload-path=<path-to-file-dir/> --bucket=<bucket-name> --force-multipart=<boolean> --include-subdirname=<boolean> --batch=<boolean>`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("Notice: this is the upload method which requires the user to provide GUIDs. In this method files will be uploaded to specified GUIDs.\nIf your intention is to upload files without pre-existing GUIDs, consider to use \"./gen3-client upload\" instead.\n\n")

Expand Down Expand Up @@ -76,51 +74,54 @@ func init() {
log.Fatalln("A valid manifest can be acquired by using the \"Download Manifest\" button on " + dataExplorerURL)
}

furObjects := validateObject(objects, uploadPath)
uploadPath, err := commonUtils.GetAbsolutePath(uploadPath)
if err != nil {
log.Fatalf("Error when parsing file paths: " + err.Error())
}

if batch {
workers, respCh, errCh, batchFURObjects = initBatchUploadChannels(numParallel, len(objects))
filePaths := make([]string, 0)
for _, object := range objects {
// Here we are assuming the local filename will be the same as GUID
filePath, err := getFullFilePath(uploadPath, object.ObjectID)
if err != nil {
log.Println(err.Error())
continue
}
filePaths = append(filePaths, filePath)
}

for i, furObject := range furObjects {
if batch {
if len(batchFURObjects) < workers {
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
batchFURObjects = append(batchFURObjects, furObject)
}
if i == len(furObjects)-1 { // upload remainders
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
}
} else {
file, err := os.Open(furObject.FilePath)
if err != nil {
log.Println("File open error: " + err.Error())
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
continue
}
defer file.Close()
singlePartFilePaths, multipartFilePaths := separateSingleAndMultipartUploads(filePaths, forceMultipart)

furObject, err := GenerateUploadRequest(gen3Interface, furObject, file)
if batch {
workers, respCh, errCh, batchFURObjects := initBatchUploadChannels(numParallel, len(singlePartFilePaths))
for i, filePath := range singlePartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
file.Close()
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
log.Printf("Error occurred during request generation: %s", err.Error())
continue
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error: " + err.Error())
return
}
err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
logs.IncrementScore(logs.ScoreBoardLen - 1)
if len(batchFURObjects) < workers {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject) //nolint:ineffassign
} else {
logs.IncrementScore(0)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I don't believe multipartUpload gets called at all if part of a batch? @BinamB

batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject) //nolint:ineffassign
}
if !forceMultipart && i == len(singlePartFilePaths)-1 { // upload remainders
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
}
file.Close()
}
} else {
processSingleUploads(gen3Interface, singlePartFilePaths, bucketName, includeSubDirName, uploadPath)
}
if len(multipartFilePaths) > 0 {
processMultipartUpload(gen3Interface, multipartFilePaths, bucketName, includeSubDirName, uploadPath)
}
if !logs.IsFailedLogMapEmpty() {
retryUpload(logs.GetFailedLogMap())
}
logs.PrintScoreBoard()
logs.CloseAll()
Expand All @@ -135,5 +136,87 @@ func init() {
uploadMultipleCmd.MarkFlagRequired("upload-path") //nolint:errcheck
uploadMultipleCmd.Flags().BoolVar(&batch, "batch", true, "Upload in parallel")
uploadMultipleCmd.Flags().IntVar(&numParallel, "numparallel", 3, "Number of uploads to run in parallel")
uploadMultipleCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded. If not provided, defaults to Gen3's configured DATA_UPLOAD_BUCKET.")
uploadMultipleCmd.Flags().BoolVar(&forceMultipart, "force-multipart", false, "Force to use multipart upload when possible (file size >= 5MB)")
uploadMultipleCmd.Flags().BoolVar(&includeSubDirName, "include-subdirname", false, "Include subdirectory names in file name")
RootCmd.AddCommand(uploadMultipleCmd)
}

func processSingleUploads(gen3Interface Gen3Interface, singleFilePaths []string, bucketName string, includeSubDirName bool, uploadPath string) {
for _, filePath := range singleFilePaths {
file, err := os.Open(filePath)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("File open error: " + err.Error())
continue
}

startSingleFileUpload(gen3Interface, filePath, file, bucketName, includeSubDirName, uploadPath)
file.Close()
}
}

func startSingleFileUpload(gen3Interface Gen3Interface, filePath string, file *os.File, bucketName string, includeSubDirName bool, uploadPath string) {
fi, err := file.Stat()
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("File stat error for file" + fi.Name() + ", file may be missing or unreadable because of permissions.\n")
return
}

fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
return
}

respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)
log.Println(err.Error())
return
}

logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)

furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, GUID: guid, PresignedURL: respURL}
furObject, err = GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
file.Close()
log.Printf("Error occurred during request generation: %s\n", err.Error())
return
}

err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
file.Close()
}

func processMultipartUpload(gen3Interface Gen3Interface, multipartFilePaths []string, bucketName string, includeSubDirName bool, uploadPath string) {
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
profileConfig := conf.ParseConfig(profile)
if profileConfig.UseShepherd == "true" ||
profileConfig.UseShepherd == "" && commonUtils.DefaultUseShepherd == true {
log.Fatalf("Error: Shepherd currently does not support multipart uploads. For the moment, please disable Shepherd with\n $ gen3-client configure --profile=%v --use-shepherd=false\nand try again.\n", profile)
}
log.Println("Multipart uploading....")

for _, filePath := range multipartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
continue
}
err = multipartUpload(gen3Interface, fileInfo, 0, bucketName)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
}
}
4 changes: 3 additions & 1 deletion gen3-client/g3cmd/upload-single.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
func init() {
var guid string
var filePath string
var bucketName string

var uploadSingleCmd = &cobra.Command{
Use: "upload-single",
Expand Down Expand Up @@ -65,7 +66,7 @@ func init() {
}
defer file.Close()

furObject := commonUtils.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid}
furObject := commonUtils.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid, Bucket: bucketName}

furObject, err = GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
Expand Down Expand Up @@ -94,5 +95,6 @@ func init() {
uploadSingleCmd.MarkFlagRequired("guid") //nolint:errcheck
uploadSingleCmd.Flags().StringVar(&filePath, "file", "", "Specify file to upload to with --file=~/path/to/file")
uploadSingleCmd.MarkFlagRequired("file") //nolint:errcheck
uploadSingleCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded. If not provided, defaults to Gen3's configured DATA_UPLOAD_BUCKET.")
RootCmd.AddCommand(uploadSingleCmd)
}
31 changes: 7 additions & 24 deletions gen3-client/g3cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

func init() {
var bucketName string
var includeSubDirName bool
var uploadPath string
var batch bool
Expand Down Expand Up @@ -70,7 +71,7 @@ func init() {
}
fmt.Println()

singlepartFilePaths, multipartFilePaths := validateFilePath(filePaths, forceMultipart)
singlepartFilePaths, multipartFilePaths := separateSingleAndMultipartUploads(filePaths, forceMultipart)

if batch {
workers, respCh, errCh, batchFURObjects := initBatchUploadChannels(numParallel, len(singlepartFilePaths))
Expand All @@ -85,13 +86,13 @@ func init() {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
}
}
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)

if len(errCh) > 0 {
close(errCh)
Expand Down Expand Up @@ -123,7 +124,7 @@ func init() {
continue
}
// The following flow is for singlepart upload flow
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata)
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)
log.Println(err.Error())
Expand Down Expand Up @@ -153,26 +154,7 @@ func init() {
if len(multipartFilePaths) > 0 {
// NOTE(@mpingram) - For the moment Shepherd doesn't support multipart uploads.
// Throw an error if Shepherd is enabled and user attempts to multipart upload.
profileConfig := conf.ParseConfig(profile)
if profileConfig.UseShepherd == "true" ||
profileConfig.UseShepherd == "" && commonUtils.DefaultUseShepherd == true {
log.Fatalf("Error: Shepherd currently does not support multipart uploads. For the moment, please disable Shepherd with\n $ gen3-client configure --profile=%v --use-shepherd=false\nand try again.\n", profile)
}
log.Println("Multipart uploading....")
for _, filePath := range multipartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
continue
}
err = multipartUpload(gen3Interface, fileInfo, 0)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
}
processMultipartUpload(gen3Interface, multipartFilePaths, bucketName, includeSubDirName, uploadPath)
}

if !logs.IsFailedLogMapEmpty() {
Expand All @@ -192,5 +174,6 @@ func init() {
uploadCmd.Flags().BoolVar(&includeSubDirName, "include-subdirname", false, "Include subdirectory names in file name")
uploadCmd.Flags().BoolVar(&forceMultipart, "force-multipart", false, "Force to use multipart upload if possible")
uploadCmd.Flags().BoolVar(&hasMetadata, "metadata", false, "Search for and upload file metadata alongside the file")
uploadCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded. If not provided, defaults to Gen3's configured DATA_UPLOAD_BUCKET.")
RootCmd.AddCommand(uploadCmd)
}
Loading
Loading