Skip to content

Commit

Permalink
Further minor improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Aug 24, 2024
1 parent 996a9f4 commit fe79218
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ const maxSkewTimeoutSeconds = 10 * 60
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// stopOnReshardDelay is how long we wait after sending a reshard journal event before ending the stream
// from the tablet.
// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before
// ending the stream from the tablet.
const stopOnReshardDelay = 2 * time.Second

// vstream contains the metadata for one VStream request.
Expand Down Expand Up @@ -671,8 +671,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
// an autocommit insert into the _vt.resharding_journal table on the
// tablet. This batch of events we're currently processing may not
// contain these events.
// tablet.
for j := i + 1; j < len(events); j++ {
sendevents = append(sendevents, events[j])
if events[j].Type == binlogdatapb.VEventType_COMMIT {
Expand All @@ -691,19 +690,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}
if je != nil {
// Wait till all other participants converge and return EOF.
// We're going to be ending the tablet stream, so we ensure a reasonable
// minimum amount of time is alloted for clients to Recv the journal event
// before the stream's context is cancelled (which would cause the grpc
// Send or Recv to fail). If the client doesn't (grpc) Recv the journal
// event before the stream ends then they'll have to resume from the last
// ShardGtid they received before the journal event.
endTimer := time.NewTimer(stopOnReshardDelay)
defer endTimer.Stop()
// Wait until all other participants converge and then return EOF after
// the minimum delay has passed.
journalDone = je.done
select {
case <-ctx.Done():
return ctx.Err()
case <-journalDone:
// We're going to be ending the tablet stream anyway, so we pause to give
// clients time to recv the journal event before the stream's context is
// cancelled (which causes the grpc SendMsg to fail).
// If the client doesn't (grpc) Recv the journal event before the stream
// ends then they'll have to resume from the last ShardGtid they received
// before the journal event.
time.Sleep(stopOnReshardDelay)
<-endTimer.C
return io.EOF
}
}
Expand Down

0 comments on commit fe79218

Please sign in to comment.