Skip to content

Commit

Permalink
make sure to apply snapshotMethod in star selects. Add testing.
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Jul 17, 2024
1 parent 2e75c1d commit 4d8919e
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 47 deletions.
118 changes: 87 additions & 31 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vreplication

import (
"encoding/json"
"fmt"
"strings"
"testing"

Expand Down Expand Up @@ -52,10 +53,11 @@ type TestTablePlan struct {

func TestBuildPlayerPlan(t *testing.T) {
testcases := []struct {
input *binlogdatapb.Filter
plan *TestReplicatorPlan
planpk *TestReplicatorPlan
err string
input *binlogdatapb.Filter
plan *TestReplicatorPlan
planpk *TestReplicatorPlan
filterSubstrings []string // expect these to be found in the filter query
err string
}{{
// Regular expression
input: &binlogdatapb.Filter{
Expand Down Expand Up @@ -173,15 +175,16 @@ func TestBuildPlayerPlan(t *testing.T) {
// Explicit columns
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select c1, c2 from t2",
Match: "t1",
Filter: "select c1, c2 from t2",
SnapshotMethod: binlogdatapb.StreamerSnapshotMethod_TrackGtids,
}},
},
plan: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t2",
Filter: "select c1, c2 from t2",
Filter: "select /*vt+ snapshotMethod=\"track\" */ c1, c2 from t2",
}},
},
TargetTables: []string{"t1"},
Expand All @@ -202,7 +205,7 @@ func TestBuildPlayerPlan(t *testing.T) {
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t2",
Filter: "select c1, c2, pk1, pk2 from t2",
Filter: "select /*vt+ snapshotMethod=\"track\" */ c1, c2, pk1, pk2 from t2",
}},
},
TargetTables: []string{"t1"},
Expand All @@ -223,15 +226,16 @@ func TestBuildPlayerPlan(t *testing.T) {
// partial group by
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select c1, c2, c3 from t2 group by c3, c1",
Match: "t1",
Filter: "select c1, c2, c3 from t2 group by c3, c1",
SnapshotMethod: binlogdatapb.StreamerSnapshotMethod_LockTables,
}},
},
plan: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t2",
Filter: "select c1, c2, c3 from t2",
Filter: "select /*vt+ snapshotMethod=\"lock\" */ c1, c2, c3 from t2",
}},
},
TargetTables: []string{"t1"},
Expand All @@ -253,7 +257,7 @@ func TestBuildPlayerPlan(t *testing.T) {
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t2",
Filter: "select c1, c2, c3, pk1, pk2 from t2",
Filter: "select /*vt+ snapshotMethod=\"lock\" */ c1, c2, c3, pk1, pk2 from t2",
}},
},
TargetTables: []string{"t1"},
Expand All @@ -275,8 +279,9 @@ func TestBuildPlayerPlan(t *testing.T) {
// full group by
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select c1, c2, c3 from t2 group by c3, c1, c2",
Match: "t1",
Filter: "select c1, c2, c3 from t2 group by c3, c1, c2",
SnapshotMethod: binlogdatapb.StreamerSnapshotMethod_Undefined, // nothing special
}},
},
plan: &TestReplicatorPlan{
Expand Down Expand Up @@ -566,6 +571,47 @@ func TestBuildPlayerPlan(t *testing.T) {
},
},
},
}, {
// Regular expression, snapshotMethod
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
SnapshotMethod: binlogdatapb.StreamerSnapshotMethod_TrackGtids,
}},
},
plan: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select /*vt+ snapshotMethod=\"track\" */ * from t1",
}},
},
TargetTables: []string{"t1"},
TablePlans: map[string]*TestTablePlan{
"t1": {
TargetName: "t1",
SendRule: "t1",
},
},
},
planpk: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select /*vt+ snapshotMethod=\"track\" */ * from t1",
}},
},
TargetTables: []string{"t1"},
TablePlans: map[string]*TestTablePlan{
"t1": {
TargetName: "t1",
SendRule: "t1",
},
},
},
filterSubstrings: []string{
`snapshotMethod="track"`,
},
}, {
// syntax error
input: &binlogdatapb.Filter{
Expand Down Expand Up @@ -735,24 +781,34 @@ func TestBuildPlayerPlan(t *testing.T) {
),
}

for _, tcase := range testcases {
plan, err := buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, nil, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
gotErr := ""
if err != nil {
gotErr = err.Error()
}
require.Equal(t, tcase.err, gotErr, "Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
for i, tcase := range testcases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
plan, err := buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, nil, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
gotErr := ""
if err != nil {
gotErr = err.Error()
}
require.Equal(t, tcase.err, gotErr, "Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
for _, s := range tcase.filterSubstrings {
for _, rule := range plan.VStreamFilter.Rules {
assert.Contains(t, rule.Filter, s)
}
for _, tablePlan := range plan.TablePlans {
assert.Contains(t, tablePlan.SendRule.Filter, s)
}
}

plan, err = buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, copyState, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
if err != nil {
continue
}
gotPlan, _ = json.Marshal(plan)
wantPlan, _ = json.Marshal(tcase.planpk)
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
plan, err = buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, copyState, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
if err != nil {
return
}
gotPlan, _ = json.Marshal(plan)
wantPlan, _ = json.Marshal(tcase.planpk)
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
})
}
}

Expand Down
43 changes: 27 additions & 16 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
sendRule := &binlogdatapb.Rule{
Match: fromTable,
}
commentsList := []string{}
applyComments := func(sel *sqlparser.Select) bool {
if len(commentsList) == 0 {
return false
}
comments := sqlparser.Comments{
fmt.Sprintf(`/*vt+ %s */`, strings.Join(commentsList, " ")),
}
sel.Comments = comments.Parsed()
return true
}

switch rule.SnapshotMethod {
case binlogdatapb.StreamerSnapshotMethod_LockTables:
commentsList = append(commentsList, `snapshotMethod="lock"`)
case binlogdatapb.StreamerSnapshotMethod_TrackGtids:
commentsList = append(commentsList, `snapshotMethod="track"`)
case binlogdatapb.StreamerSnapshotMethod_Undefined:
// leave empty
}

if expr, ok := sel.SelectExprs[0].(*sqlparser.StarExpr); ok {
// If it's a "select *", we return a partial plan, and complete
Expand All @@ -239,7 +259,12 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
if !expr.TableName.IsEmpty() {
return nil, planError(fmt.Errorf("unsupported qualifier for '*' expression"), sqlparser.String(expr))
}
sendRule.Filter = query
if applyComments(sel) {
sendRule.Filter = sqlparser.String(sel)
} else {
sendRule.Filter = query
}

tablePlan := &TablePlan{
TargetName: tableName,
SendRule: sendRule,
Expand Down Expand Up @@ -310,27 +335,13 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
},
})
}
commentsList := []string{}
if rule.SourceUniqueKeyColumns != "" {
commentsList = append(commentsList, fmt.Sprintf(`ukColumns="%s"`, rule.SourceUniqueKeyColumns))
}
if rule.ForceUniqueKey != "" {
commentsList = append(commentsList, fmt.Sprintf(`ukForce="%s"`, rule.ForceUniqueKey))
}
switch rule.SnapshotMethod {
case binlogdatapb.StreamerSnapshotMethod_LockTables:
commentsList = append(commentsList, `snapshotMethod="lock"`)
case binlogdatapb.StreamerSnapshotMethod_TrackGtids:
commentsList = append(commentsList, `snapshotMethod="track"`)
case binlogdatapb.StreamerSnapshotMethod_Undefined:
// leave empty
}
if len(commentsList) > 0 {
comments := sqlparser.Comments{
fmt.Sprintf(`/*vt+ %s */`, strings.Join(commentsList, " ")),
}
tpb.sendSelect.Comments = comments.Parsed()
}
applyComments(tpb.sendSelect)
sendRule.Filter = sqlparser.String(tpb.sendSelect)

tablePlan := tpb.generate()
Expand Down

0 comments on commit 4d8919e

Please sign in to comment.