Skip to content

Commit

Permalink
VReplication: Improve error handling in VTGate VStreams (#17558)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Feb 11, 2025
1 parent 2b21fb4 commit cca7b7f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 69 deletions.
19 changes: 11 additions & 8 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -76,11 +77,13 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
done := false
done := atomic.Bool{}
done.Store(false)

// don't insert while PRS is going on
var insertMu sync.Mutex
stopInserting := false
stopInserting := atomic.Bool{}
stopInserting.Store(false)
id := 0

vtgateConn := vc.GetVTGateConn(t)
Expand All @@ -89,7 +92,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
go func() {
for {
if stopInserting {
if stopInserting.Load() {
return
}
insertMu.Lock()
Expand Down Expand Up @@ -121,7 +124,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
log.Infof("%s:: remote error: %v", time.Now(), err)
}

if done {
if done.Load() {
return
}
}
Expand Down Expand Up @@ -153,12 +156,12 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
require.NoError(t, err)
}
time.Sleep(100 * time.Millisecond)
stopInserting = true
time.Sleep(2 * time.Second)
done = true
stopInserting.Store(true)
time.Sleep(10 * time.Second) // Give the vstream plenty of time to catchup
done.Store(true)
}

if done {
if done.Load() {
break
}
}
Expand Down
Loading

0 comments on commit cca7b7f

Please sign in to comment.