diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 0beb83036ee..bf40fdad8d5 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -613,7 +613,15 @@ func (be *BuiltinBackupEngine) backupFiles( wg := sync.WaitGroup{} ctxCancel, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // We may still have operations in flight that require a valid context, such as adding files to S3. + // Unless we encountered an error, we should not cancel the context. This is taken care of later + // in the process. If we encountered an error however, we can safely cancel the context as we should + // no longer work on anything and exit fast. + if finalErr != nil { + cancel() + } + }() for i := range fes { wg.Add(1) @@ -634,7 +642,7 @@ func (be *BuiltinBackupEngine) backupFiles( // We check for errors before checking if the context is canceled on purpose, if there was an // error, the context would have been canceled already. if bh.HasErrors() { - params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) + params.Logger.Errorf("Failed to backup files due to error: %v", bh.Error()) return } @@ -811,12 +819,15 @@ func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { - ctx, cancel := context.WithCancel(ctx) - defer func() { - if finalErr != nil { - cancel() - } - }() + // We need another context that does not live outside of this function. + // Reporting progress, compressing and writing are operations that will be + // over by the time we exit this function, they can use this cancelable context. + // However, AddFile is something that may continue in the background even after + // this function exits. In this case, we give it the parent context so the caller + // has more control over when to cancel AddFile. + cancelableCtx, cancel := context.WithCancel(ctx) + defer cancel() + // Open the source file for reading. openSourceAt := time.Now() source, err := fe.open(params.Cnf, true) @@ -841,7 +852,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara attemptStr := attemptToString(fe.AttemptNb) br := newBackupReader(fe.Name, fi.Size(), timedSource) - go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/, attemptStr) + go br.ReportProgress(cancelableCtx, builtinBackupProgress, params.Logger, false /*restore*/, attemptStr) // Open the destination file for writing, and a buffer. params.Logger.Infof("Backing up file: %v %s", fe.Name, attemptStr) @@ -889,7 +900,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara if backupStorageCompress { var compressor io.WriteCloser if ExternalCompressorCmd != "" { - compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) + compressor, err = newExternalCompressor(cancelableCtx, ExternalCompressorCmd, writer, params.Logger) } else { compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) } @@ -900,7 +911,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) - closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) + closer := ioutil.NewTimeoutCloser(cancelableCtx, compressor, closeTimeout) defer func() { // Close gzip to flush it, after that all data is sent to writer. closeCompressorAt := time.Now()