Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Support for BETWEEN/NOT BETWEEN filter in VStream #17721

Merged
merged 13 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 223 additions & 17 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ func TestVStreamPushdownFilters(t *testing.T) {
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute)
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

Expand Down Expand Up @@ -1173,19 +1173,233 @@ func TestVStreamPushdownFilters(t *testing.T) {
Filter: "select * from customer where name = 'påul'",
}},
}
flags := &vtgatepb.VStreamFlags{}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// So we should have at least one paul row event in the copy phase.
copyPhaseRowEvents := 0
// And we should have many paul row events in the running phase.
runningPhaseRowEvents := 0
copyPhase := true
// So we should have at least one paul row event in the copy phase, and
// we should have many paul row events in the running phase.
copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
}

// TestVStreamPushdownBetweenFilter is same as TestVStreamPushdownFilters, but it tests `BETWEEN`.
func TestVStreamPushdownBetweenFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
setSidecarDBName("_vt")
config := *mainClusterConfig
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()
require.NotNil(t, vc)
ks := "product"
shard := "0"
defaultCell := vc.Cells[vc.CellNames[0]]

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)
insertInitialData(t)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name between 'a' and 'c'", ks), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
startingRowCount, err := res.Rows[0][0].ToInt()
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
createdBetweenRows := startingRowCount
createdNonBetweenRows := 0
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
if id%10 == 0 {
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
createdBetweenRows += 3
} else {
insertRow(ks, "customer", id)
createdNonBetweenRows++
}
time.Sleep(10 * time.Millisecond)
id++
}
}
}()

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer where name between 'a' and 'c'",
}},
}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdBetweenRows)
require.NotZero(t, createdNonBetweenRows)
require.Greater(t, createdNonBetweenRows, createdBetweenRows)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

require.Equal(t, createdBetweenRows, copyPhaseRowEvents+runningPhaseRowEvents)
}

// TestVStreamPushdownBetweenFilter is same as TestVStreamPushdownFilters, but it tests `IN`.
func TestVStreamPushdownInFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
setSidecarDBName("_vt")
config := *mainClusterConfig
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()
require.NotNil(t, vc)
ks := "product"
shard := "0"
defaultCell := vc.Cells[vc.CellNames[0]]

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)
insertInitialData(t)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name in ('a', 'b', 'c')", ks), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
startingRowCount, err := res.Rows[0][0].ToInt()
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
createdInRows := startingRowCount
createdNonInRows := 0
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
if id%10 == 0 {
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('a')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('b')", ks), 1, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('c')", ks), 1, false)
require.NoError(t, err)
createdInRows += 3
} else {
insertRow(ks, "customer", id)
createdNonInRows++
}
time.Sleep(10 * time.Millisecond)
id++
}
}
}()

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer where name in ('a', 'b', 'c')",
}},
}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdInRows)
require.NotZero(t, createdNonInRows)
require.Greater(t, createdNonInRows, createdInRows)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

require.Equal(t, createdInRows, copyPhaseRowEvents+runningPhaseRowEvents)
}

// runVStreamAndGetNumOfRowEvents runs VStream with the specified filter and
// returns number of copy phase and running phase row events.
func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamConn *vtgateconn.VTGateConn,
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
copyPhase := true
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -1214,13 +1428,5 @@ func TestVStreamPushdownFilters(t *testing.T) {
}
}
}()

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
return
}
Loading
Loading