Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(PPS-411): Add bucket selection support for multi part upload #123

Merged
merged 17 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gen3-client/commonUtils/commonUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type FileUploadRequestObject struct {
PresignedURL string
Request *http.Request
Bar *pb.ProgressBar
Bucket string `json:"bucket,omitempty"`
}

// FileDownloadResponseObject defines a object for file download
Expand Down Expand Up @@ -105,6 +106,7 @@ type RetryObject struct {
GUID string
RetryCount int
Multipart bool
Bucket string
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
}

// ParseRootPath parses dirname that has "~" in the beginning
Expand Down
5 changes: 3 additions & 2 deletions gen3-client/g3cmd/retry-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {
var guid string
var presignedURL string
var err error

fmt.Println()
if len(failedLogMap) == 0 {
log.Println("No failed file in log, no need to retry upload.")
Expand Down Expand Up @@ -96,7 +97,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {

if ro.Multipart {
fileInfo := FileInfo{FilePath: ro.FilePath, Filename: ro.Filename}
err = multipartUpload(gen3Interface, fileInfo, ro.RetryCount)
err = multipartUpload(gen3Interface, fileInfo, ro.RetryCount, ro.Bucket)
if err != nil {
updateRetryObject(&ro, ro.FilePath, ro.Filename, ro.FileMetadata, ro.GUID, ro.RetryCount, true)
handleFailedRetry(ro, retryObjCh, err, true)
Expand All @@ -109,7 +110,7 @@ func retryUpload(failedLogMap map[string]commonUtils.RetryObject) {
}
}
} else {
presignedURL, guid, err = GeneratePresignedURL(gen3Interface, ro.Filename, ro.FileMetadata)
presignedURL, guid, err = GeneratePresignedURL(gen3Interface, ro.Filename, ro.FileMetadata, ro.Bucket)
if err != nil {
updateRetryObject(&ro, ro.FilePath, ro.Filename, ro.FileMetadata, guid, ro.RetryCount, false)
handleFailedRetry(ro, retryObjCh, err, true)
Expand Down
8 changes: 4 additions & 4 deletions gen3-client/g3cmd/upload-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func retry(attempts int, filePath string, guid string, f func() error) (err erro
return fmt.Errorf("After %d attempts, last error: %s", attempts, err)
}

func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error {
func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int, bucketName string) error {
// NOTE @mpingram -- multipartUpload does not yet use the new Shepherd API
// because Shepherd does not yet support multipart uploads.
file, err := os.Open(fileInfo.FilePath)
Expand All @@ -61,7 +61,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
return err
}

uploadID, guid, err := InitMultipartUpload(g3, fileInfo.Filename)
uploadID, guid, err := InitMultipartUpload(g3, fileInfo.Filename, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, retryCount, true, true)
err = fmt.Errorf("FAILED multipart upload for %s: %s", fileInfo.Filename, err.Error())
Expand All @@ -85,7 +85,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
for chunkIndex := range chunkIndexCh {
var presignedURL string
err = retry(MaxRetryCount, fileInfo.FilePath, guid, func() (err error) {
presignedURL, err = GenerateMultipartPresignedURL(g3, key, uploadID, chunkIndex)
presignedURL, err = GenerateMultipartPresignedURL(g3, key, uploadID, chunkIndex, bucketName)
return
})
if err != nil {
Expand Down Expand Up @@ -165,7 +165,7 @@ func multipartUpload(g3 Gen3Interface, fileInfo FileInfo, retryCount int) error
return parts[i].PartNumber < parts[j].PartNumber // sort parts in ascending order
})

if err = CompleteMultipartUpload(g3, key, uploadID, parts); err != nil {
if err = CompleteMultipartUpload(g3, key, uploadID, parts, bucketName); err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, retryCount, true, true)
err = fmt.Errorf("FAILED multipart upload for %s: %s", fileInfo.Filename, err.Error())
return err
Expand Down
190 changes: 157 additions & 33 deletions gen3-client/g3cmd/upload-multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

func init() {
var bucketName string
var manifestPath string
var uploadPath string
var batch bool
Expand All @@ -25,6 +26,8 @@ func init() {
var respCh chan *http.Response
var errCh chan error
var batchFURObjects []commonUtils.FileUploadRequestObject
var forceMultipart bool
var includeSubDirName bool
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved

var uploadMultipleCmd = &cobra.Command{
Use: "upload-multiple",
Expand Down Expand Up @@ -76,50 +79,168 @@ func init() {
log.Fatalln("A valid manifest can be acquired by using the \"Download Manifest\" button on " + dataExplorerURL)
}

furObjects := validateObject(objects, uploadPath)

if batch {
workers, respCh, errCh, batchFURObjects = initBatchUploadChannels(numParallel, len(objects))
}
if forceMultipart {
uploadPath, _ := commonUtils.GetAbsolutePath(uploadPath)
// filePaths, err := commonUtils.ParseFilePaths(uploadPath, false)
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not leave dead code, if we don't need this, remove it

if err != nil {
log.Fatalf("Error when parsing file paths: " + err.Error())
}

for i, furObject := range furObjects {
singlepartFilePaths, multipartFilePaths := separateSingleMultipartUploads(objects, uploadPath, forceMultipart)
if batch {
if len(batchFURObjects) < workers {
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
batchFURObjects = append(batchFURObjects, furObject)
workers, respCh, errCh, batchFURObjects := initBatchUploadChannels(numParallel, len(singlepartFilePaths))
for _, filePath := range singlepartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error: " + err.Error())
continue
}
if len(batchFURObjects) < workers {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
}
}
if i == len(furObjects)-1 { // upload remainders
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)

if len(errCh) > 0 {
close(errCh)
for err := range errCh {
if err != nil {
log.Printf("Error occurred during uploading: %s\n", err.Error())
}
}
}
} else {
file, err := os.Open(furObject.FilePath)
if err != nil {
log.Println("File open error: " + err.Error())
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
continue
}
defer file.Close()
for _, filePath := range singlepartFilePaths {
file, err := os.Open(filePath)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("File open error: " + err.Error())
continue
}

fi, err := file.Stat()
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("File stat error for file" + fi.Name() + ", file may be missing or unreadable because of permissions.\n")
continue
}
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
continue
}
// The following flow is for singlepart upload flow
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)
log.Println(err.Error())
continue
}
// update failed log with new guid
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)

furObject, err := GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, GUID: guid, PresignedURL: respURL}
furObject, err = GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
file.Close()
log.Printf("Error occurred during request generation: %s\n", err.Error())
continue
}
err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
file.Close()
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
log.Printf("Error occurred during request generation: %s", err.Error())
continue
}
err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
logs.IncrementScore(logs.ScoreBoardLen - 1)
}
// multipart upload for large files here
if len(multipartFilePaths) > 0 {
// NOTE(@mpingram) - For the moment Shepherd doesn't support multipart uploads.
// Throw an error if Shepherd is enabled and user attempts to multipart upload.
profileConfig := conf.ParseConfig(profile)
if profileConfig.UseShepherd == "true" ||
profileConfig.UseShepherd == "" && commonUtils.DefaultUseShepherd == true {
log.Fatalf("Error: Shepherd currently does not support multipart uploads. For the moment, please disable Shepherd with\n $ gen3-client configure --profile=%v --use-shepherd=false\nand try again.\n", profile)
}
log.Println("Multipart uploading....")
for _, filePath := range multipartFilePaths {
fileInfo, err := ProcessFilename(uploadPath, filePath, includeSubDirName, false)
if err != nil {
logs.AddToFailedLog(filePath, filepath.Base(filePath), commonUtils.FileMetadata{}, "", 0, false, true)
log.Println("Process filename error for file: " + err.Error())
continue
}
err = multipartUpload(gen3Interface, fileInfo, 0, bucketName)
if err != nil {
log.Println(err.Error())
} else {
logs.IncrementScore(0)
}
}
}

if !logs.IsFailedLogMapEmpty() {
retryUpload(logs.GetFailedLogMap())
}
logs.PrintScoreBoard()
logs.CloseAll()
} else {
furObjects := validateObject(objects, uploadPath)

if batch {
workers, respCh, errCh, batchFURObjects = initBatchUploadChannels(numParallel, len(objects))
}

for i, furObject := range furObjects {
if batch {
if len(batchFURObjects) < workers {
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
batchFURObjects = append(batchFURObjects, furObject)
}
if i == len(furObjects)-1 { // upload remainders
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
}
} else {
logs.IncrementScore(0)
file, err := os.Open(furObject.FilePath)
if err != nil {
log.Println("File open error: " + err.Error())
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
continue
}
defer file.Close()

furObject, err := GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
file.Close()
logs.AddToFailedLog(furObject.FilePath, furObject.Filename, commonUtils.FileMetadata{}, furObject.GUID, 0, false, true)
logs.IncrementScore(logs.ScoreBoardLen - 1)
log.Printf("Error occurred during request generation: %s", err.Error())
continue
}
err = uploadFile(furObject, 0)
if err != nil {
log.Println(err.Error())
logs.IncrementScore(logs.ScoreBoardLen - 1)
} else {
logs.IncrementScore(0)
}
file.Close()
}
file.Close()
}
}
logs.PrintScoreBoard()
Expand All @@ -135,5 +256,8 @@ func init() {
uploadMultipleCmd.MarkFlagRequired("upload-path") //nolint:errcheck
uploadMultipleCmd.Flags().BoolVar(&batch, "batch", true, "Upload in parallel")
uploadMultipleCmd.Flags().IntVar(&numParallel, "numparallel", 3, "Number of uploads to run in parallel")
uploadMultipleCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded")
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
uploadMultipleCmd.Flags().BoolVar(&forceMultipart, "force-multipart", false, "Force to use multipart upload if possible")
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
uploadMultipleCmd.Flags().BoolVar(&includeSubDirName, "include-subdirname", false, "Include subdirectory names in file name")
RootCmd.AddCommand(uploadMultipleCmd)
}
4 changes: 3 additions & 1 deletion gen3-client/g3cmd/upload-single.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
func init() {
var guid string
var filePath string
var bucketName string

var uploadSingleCmd = &cobra.Command{
Use: "upload-single",
Expand Down Expand Up @@ -65,7 +66,7 @@ func init() {
}
defer file.Close()

furObject := commonUtils.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid}
furObject := commonUtils.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid, Bucket: bucketName}

furObject, err = GenerateUploadRequest(gen3Interface, furObject, file)
if err != nil {
Expand Down Expand Up @@ -94,5 +95,6 @@ func init() {
uploadSingleCmd.MarkFlagRequired("guid") //nolint:errcheck
uploadSingleCmd.Flags().StringVar(&filePath, "file", "", "Specify file to upload to with --file=~/path/to/file")
uploadSingleCmd.MarkFlagRequired("file") //nolint:errcheck
uploadSingleCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded")
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
RootCmd.AddCommand(uploadSingleCmd)
}
10 changes: 6 additions & 4 deletions gen3-client/g3cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

func init() {
var bucketName string
var includeSubDirName bool
var uploadPath string
var batch bool
Expand Down Expand Up @@ -85,13 +86,13 @@ func init() {
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
} else {
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)
batchFURObjects = make([]commonUtils.FileUploadRequestObject, 0)
furObject := commonUtils.FileUploadRequestObject{FilePath: fileInfo.FilePath, Filename: fileInfo.Filename, FileMetadata: fileInfo.FileMetadata, GUID: ""}
batchFURObjects = append(batchFURObjects, furObject)
}
}
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh)
batchUpload(gen3Interface, batchFURObjects, workers, respCh, errCh, bucketName)

if len(errCh) > 0 {
close(errCh)
Expand Down Expand Up @@ -123,7 +124,7 @@ func init() {
continue
}
// The following flow is for singlepart upload flow
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata)
respURL, guid, err := GeneratePresignedURL(gen3Interface, fileInfo.Filename, fileInfo.FileMetadata, bucketName)
if err != nil {
logs.AddToFailedLog(fileInfo.FilePath, fileInfo.Filename, fileInfo.FileMetadata, guid, 0, false, true)
log.Println(err.Error())
Expand Down Expand Up @@ -166,7 +167,7 @@ func init() {
log.Println("Process filename error for file: " + err.Error())
continue
}
err = multipartUpload(gen3Interface, fileInfo, 0)
err = multipartUpload(gen3Interface, fileInfo, 0, bucketName)
if err != nil {
log.Println(err.Error())
} else {
Expand All @@ -192,5 +193,6 @@ func init() {
uploadCmd.Flags().BoolVar(&includeSubDirName, "include-subdirname", false, "Include subdirectory names in file name")
uploadCmd.Flags().BoolVar(&forceMultipart, "force-multipart", false, "Force to use multipart upload if possible")
uploadCmd.Flags().BoolVar(&hasMetadata, "metadata", false, "Search for and upload file metadata alongside the file")
uploadCmd.Flags().StringVar(&bucketName, "bucket", "", "The bucket to which files will be uploaded")
paulineribeyre marked this conversation as resolved.
Show resolved Hide resolved
RootCmd.AddCommand(uploadCmd)
}
Loading