diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index a300a3dcbc8..1afc0a7cf14 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -61,8 +61,8 @@ const maxSkewTimeoutSeconds = 10 * 60 // for a vstream const tabletPickerContextTimeout = 90 * time.Second -// stopOnReshardDelay is how long we wait after sending a reshard journal event before ending the stream -// from the tablet. +// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before +// ending the stream from the tablet. const stopOnReshardDelay = 2 * time.Second // vstream contains the metadata for one VStream request. @@ -671,8 +671,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Read any subsequent events until we get the VGTID->COMMIT events that // always follow the JOURNAL event which is generated as a result of // an autocommit insert into the _vt.resharding_journal table on the - // tablet. This batch of events we're currently processing may not - // contain these events. + // tablet. for j := i + 1; j < len(events); j++ { sendevents = append(sendevents, events[j]) if events[j].Type == binlogdatapb.VEventType_COMMIT { @@ -691,19 +690,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } if je != nil { - // Wait till all other participants converge and return EOF. + // We're going to be ending the tablet stream, so we ensure a reasonable + // minimum amount of time is alloted for clients to Recv the journal event + // before the stream's context is cancelled (which would cause the grpc + // Send or Recv to fail). If the client doesn't (grpc) Recv the journal + // event before the stream ends then they'll have to resume from the last + // ShardGtid they received before the journal event. + endTimer := time.NewTimer(stopOnReshardDelay) + defer endTimer.Stop() + // Wait until all other participants converge and then return EOF after + // the minimum delay has passed. journalDone = je.done select { case <-ctx.Done(): return ctx.Err() case <-journalDone: - // We're going to be ending the tablet stream anyway, so we pause to give - // clients time to recv the journal event before the stream's context is - // cancelled (which causes the grpc SendMsg to fail). - // If the client doesn't (grpc) Recv the journal event before the stream - // ends then they'll have to resume from the last ShardGtid they received - // before the journal event. - time.Sleep(stopOnReshardDelay) + <-endTimer.C return io.EOF } }