Skip to content

Commit

Permalink
VReplication: Align VReplication and VTGate VStream Retry Logic (#17783)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Feb 19, 2025
1 parent 83e2a4f commit 5c27b40
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 17 deletions.
32 changes: 24 additions & 8 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"golang.org/x/exp/maps"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -798,20 +799,35 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
errCode := vterrors.Code(err)

// In this context, where we will run the tablet picker again on retry, these
// codes indicate that it's worth a retry as the error is likely a transient
// one with a tablet or within the shard.
if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
// This typically indicates that the user provided invalid arguments for the
// VStream so we should not retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT {
// But if there is a GTIDSet Mismatch on the tablet, omit that tablet from
// the candidate list in the TabletPicker and retry. The argument was invalid
// *for that specific *tablet* but it's not generally invalid.
if strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}
return false, false
}
// Internal errors such as not having all journaling partipants require a new
// VStream.
if errCode == vtrpcpb.Code_INTERNAL {
return false, false
}

return false, false
// For anything else, if this is an ephemeral SQL error -- such as a
// MAX_EXECUTION_TIME SQL error during the copy phase -- or any other
// type of non-SQL error, then retry.
return sqlerror.IsEphemeralError(err), false
}

// sendAll sends a group of events together while holding the lock.
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,19 @@ func TestVStreamRetriableErrors(t *testing.T) {
ignoreTablet: false,
},
{
name: "should not retry",
name: "invalid argument",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
ignoreTablet: false,
},
{
name: "query interrupted",
code: vtrpcpb.Code_UNKNOWN,
msg: "vttablet: rpc error: code = Unknown desc = Query execution was interrupted, maximum statement execution time exceeded (errno 3024) (sqlstate HY000)",
shouldRetry: true,
ignoreTablet: false,
},
}

commit := []*binlogdatapb.VEvent{
Expand Down Expand Up @@ -928,7 +935,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Variable names are maintained like in OneToMany, but order is different.1
// Variable names are maintained like in OneToMany, but order is different.
ks := "TestVStream"
cell := "aa"
_ = createSandbox(ks)
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, para
insertLog(dbClient, action, vreplID, params["state"], message)
}

// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate.
// isUnrecoverableError returns true if vreplication cannot recover from the given error and
// should completely terminate.
func isUnrecoverableError(err error) bool {
if err == nil {
return false
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletmanager/vreplication/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"strings"
"testing"

vttablet "vitess.io/vitess/go/vt/vttablet/common"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/vterrors"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (
"time"

"vitess.io/vitess/go/mysql/replication"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var uvstreamerTestMode = false // Only used for testing
Expand Down

0 comments on commit 5c27b40

Please sign in to comment.