diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 3617b2637c8..486df025d16 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -66,6 +66,22 @@ const tabletPickerContextTimeout = 90 * time.Second // ending the stream from the tablet. const stopOnReshardDelay = 500 * time.Millisecond +const partialJournalingParticipantsMsg = "not all journaling participants are in the stream" + +type partialJournalingParticipantsError struct { + details string +} + +func (p *partialJournalingParticipantsError) Error() string { + return fmt.Sprintf("%s: %s", partialJournalingParticipantsMsg, p.details) +} + +func NewPartialJournalingParticipantsError(details string) error { + return &partialJournalingParticipantsError{ + details: details, + } +} + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -784,15 +800,27 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // A tablet should be ignored upon retry if it's likely another tablet will not // produce the same error. func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) { + // Not having all journaling participants available is considered fatal and + // requires a new VStream. + if _, ok := vterrors.UnwrapAll(err).(*partialJournalingParticipantsError); ok { + return false, false + } + errCode := vterrors.Code(err) - // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate - // list in the TabletPicker on retry. - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { - return true, true + // This typically indicates that the user provided invalid arguments for the + // VStream so we should not retry. + if errCode == vtrpcpb.Code_INVALID_ARGUMENT { + // But if there is a GTIDSet Mismatch on the tablet, omit that tablet from + // the candidate list in the TabletPicker and retry. The argument was invalid + // *for that specific *tablet* but it's not generally invalid. + if strings.Contains(err.Error(), "GTIDSet Mismatch") { + return true, true + } + return false, false } - // If this is a recoverable/ephemeral error, then retry. + // For anything else, if this is a recoverable/ephemeral error then retry. if !vreplication.IsUnrecoverableError(err) { return true, false } @@ -938,7 +966,7 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar mode = matchAll je.participants[inner] = false case matchNone: - return nil, fmt.Errorf("not all journaling participants are in the stream: journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids) + return nil, NewPartialJournalingParticipantsError(fmt.Sprintf("journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids)) } continue nextParticipant } @@ -947,7 +975,7 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar case undecided, matchNone: mode = matchNone case matchAll: - return nil, fmt.Errorf("not all journaling participants are in the stream: journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids) + return nil, NewPartialJournalingParticipantsError(fmt.Sprintf("journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids)) } } if mode == matchNone { diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 4e10e60c758..2a718fb7497 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -402,7 +402,7 @@ func TestVStreamRetriableErrors(t *testing.T) { name: "failed precondition", code: vtrpcpb.Code_FAILED_PRECONDITION, msg: "", - shouldRetry: true, + shouldRetry: false, ignoreTablet: false, }, { @@ -420,7 +420,7 @@ func TestVStreamRetriableErrors(t *testing.T) { ignoreTablet: false, }, { - name: "should not retry", + name: "invalid argument", code: vtrpcpb.Code_INVALID_ARGUMENT, msg: "final error", shouldRetry: false, @@ -928,7 +928,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Variable names are maintained like in OneToMany, but order is different.1 + // Variable names are maintained like in OneToMany, but order is different. ks := "TestVStream" cell := "aa" _ = createSandbox(ks)