Skip to content

Commit

Permalink
Use LastError & exponential backoff
Browse files Browse the repository at this point in the history
Signed-off-by: twthorn <thomaswilliamthornton@gmail.com>
  • Loading branch information
twthorn committed Aug 6, 2024
1 parent ab08158 commit 293f586
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtadmin/cluster/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/debug"
"vitess.io/vitess/go/vt/vtadmin/internal/backoff"
"vitess.io/vitess/go/vt/vterrors/backoff"
)

const logPrefix = "[vtadmin.cluster.resolver]"
Expand Down
File renamed without changes.
File renamed without changes.
67 changes: 51 additions & 16 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"golang.org/x/exp/maps"
grpcbackoff "google.golang.org/grpc/backoff"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
Expand All @@ -36,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vterrors/backoff"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -46,9 +48,11 @@ import (

// vstreamManager manages vstream requests.
type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
maxTimeInError time.Duration
baseRetryDelay time.Duration

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
Expand All @@ -61,6 +65,9 @@ const maxSkewTimeoutSeconds = 10 * 60
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// Default max time a tablet with the same error should be retried before retrying another.
const defaultMaxTimeInError = 5 * time.Minute

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -122,6 +129,9 @@ type vstream struct {
ts *topo.Server

tabletPickerOptions discovery.TabletPickerOptions

lastError *vterrors.LastError
backoffStrategy backoff.Strategy
}

type journalEvent struct {
Expand All @@ -134,9 +144,11 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
exporter := servenv.NewExporter(cell, "VStreamManager")

return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
resolver: resolver,
toposerv: serv,
cell: cell,
maxTimeInError: defaultMaxTimeInError,
baseRetryDelay: grpcbackoff.DefaultConfig.BaseDelay,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
Expand Down Expand Up @@ -183,10 +195,26 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
CellPreference: flags.GetCellPreference(),
TabletOrder: flags.GetTabletOrder(),
},
lastError: initLastError(vsm.maxTimeInError),
backoffStrategy: initBackOffStrategy(vsm.baseRetryDelay),
}
return vs.stream(ctx)
}

func initLastError(maxTimeInError time.Duration) *vterrors.LastError {
return vterrors.NewLastError("VStreamManager", maxTimeInError)
}

func initBackOffStrategy(retryDelay time.Duration) backoff.Strategy {
config := grpcbackoff.Config{
BaseDelay: retryDelay,
Multiplier: grpcbackoff.DefaultConfig.Multiplier,
Jitter: grpcbackoff.DefaultConfig.Jitter,
MaxDelay: grpcbackoff.DefaultConfig.MaxDelay,
}
return backoff.Get("exponential", config)
}

// resolveParams provides defaults for the inputs if they're not specified.
func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (*binlogdatapb.VGtid, *binlogdatapb.Filter, *vtgatepb.VStreamFlags, error) {
Expand Down Expand Up @@ -484,7 +512,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var journalDone chan struct{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)

errCount := 0
backoffIndex := 0
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -529,6 +557,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()
tablet, err := tp.PickForStreaming(tpCtx)

if err != nil {
return tabletPickerErr(err)
}
Expand Down Expand Up @@ -585,8 +614,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
var vstreamCreatedOnce sync.Once
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
// We received a valid event. Reset backoff index.
backoffIndex = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}

Expand Down Expand Up @@ -708,15 +737,21 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}

ignoreTablets = append(ignoreTablets, tablet.GetAlias())
vs.lastError.Record(err)

errCount++
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
if vs.lastError.ShouldRetry() {
log.Infof("Retrying tablet, count: %d, alias: %v, hostname: %s", backoffIndex, tablet.GetAlias(), tablet.GetHostname())
retryDelay := vs.backoffStrategy.Backoff(backoffIndex)
backoffIndex++
time.Sleep(retryDelay)
} else {
log.Infof("Adding tablet to ignore list, alias: %v, hostname: %s", tablet.GetAlias(), tablet.GetHostname())
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
vs.lastError = initLastError(vs.vsm.maxTimeInError)
backoffIndex = 0
}
log.Infof("vstream for %s/%s error, retrying, count: %d, error: %v", sgtid.Keyspace, sgtid.Shard, errCount, err)

log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}
}

Expand Down
61 changes: 44 additions & 17 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,31 +390,42 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {

func TestVStreamRetriableErrors(t *testing.T) {
type testCase struct {
name string
code vtrpcpb.Code
msg string
name string
code vtrpcpb.Code
msg string
shouldSwitchTablets bool
}

tcases := []testCase{
{
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldSwitchTablets: false,
},
{
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldSwitchTablets: true,
},
{
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldSwitchTablets: false,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "a different unavailable error that persists",
shouldSwitchTablets: true,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldSwitchTablets: false,
},
}

Expand Down Expand Up @@ -446,9 +457,25 @@ func TestVStreamRetriableErrors(t *testing.T) {

vsm := newTestVStreamManager(ctx, hc, st, cells[0])

// Always have the local cell tablet error so it's ignored on retry and we pick the other one
if tcase.shouldSwitchTablets {
// Retry just once before trying another tablet.
vsm.maxTimeInError = 1 * time.Nanosecond
vsm.baseRetryDelay = 1 * time.Millisecond
} else {
// Retry at least once on the same tablet.
vsm.maxTimeInError = 1 * time.Second
vsm.baseRetryDelay = 1 * time.Nanosecond
}

// Always have the local cell tablet error on its first vstream
sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg))
sbc1.AddVStreamEvents(commit, nil)
if tcase.shouldSwitchTablets {
// Add desired events to the new tablet we should switch to.
sbc1.AddVStreamEvents(commit, nil)
} else {
// Add desired events to the original tablet. We must retry on the same tablet to obtain them.
sbc0.AddVStreamEvents(commit, nil)
}

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Expand Down

0 comments on commit 293f586

Please sign in to comment.