diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index b856572ab7b..403e39cac66 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -45,8 +45,9 @@ import ( // default collation as it has to work across versions and the 8.0 default does not exist in 5.7. var ( // All standard user tables should have a primary key and at least one secondary key. - customerTypes = []string{"'individual'", "'soho'", "'enterprise'"} - customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, + customerTypes = []string{"'individual'", "'soho'", "'enterprise'"} + // We use utf8mb4_general_ci so that we can test with 5.7 and 8.0+. + customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_general_ci, meta json default null, industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual, typ enum(%s), sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(%s), key(name)) CHARSET=utf8mb4` diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 7009bade562..6c0f1919616 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -1090,3 +1090,137 @@ func TestVStreamHeartbeats(t *testing.T) { }) } } + +// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly +// when they are specified in the VStream API via the rule.Filter. +// It also confirms that we use the proper collation for the VStream filter when +// using VARCHAR fields. +func TestVStreamPushdownFilters(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + setSidecarDBName("_vt") + config := *mainClusterConfig + vc = NewVitessCluster(t, &clusterOptions{ + clusterConfig: &config, + }) + defer vc.TearDown() + require.NotNil(t, vc) + ks := "product" + shard := "0" + defaultCell := vc.Cells[vc.CellNames[0]] + + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + require.NoError(t, err) + verifyClusterHealth(t, vc) + insertInitialData(t) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + + // Make sure that we get at least one paul row event in the copy phase. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false) + require.NoError(t, err) + res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name = 'pauĺ'", ks), 1, false) + require.NoError(t, err) + require.Len(t, res.Rows, 1) + startingPauls, err := res.Rows[0][0].ToInt() + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + createdPauls := startingPauls + createdNonPauls := 0 + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + if id%10 == 0 { + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('paÜl')", ks), 1, false) + require.NoError(t, err) + createdPauls++ + } else { + insertRow(ks, "customer", id) + createdNonPauls++ + } + time.Sleep(10 * time.Millisecond) + id++ + } + } + }() + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: shard, + Gtid: "", + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "customer", + Filter: "select * from customer where name = 'påul'", + }}, + } + flags := &vtgatepb.VStreamFlags{} + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // So we should have at least one paul row event in the copy phase. + copyPhaseRowEvents := 0 + // And we should have many paul row events in the running phase. + runningPhaseRowEvents := 0 + copyPhase := true + + func() { + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_COPY_COMPLETED: + copyPhase = false + case binlogdatapb.VEventType_ROW: + if copyPhase { + copyPhaseRowEvents++ + } else { + runningPhaseRowEvents++ + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-done: + return + default: + } + } + }() + + require.NotZero(t, createdPauls) + require.NotZero(t, createdNonPauls) + require.Greater(t, createdNonPauls, createdPauls) + require.NotZero(t, copyPhaseRowEvents) + require.NotZero(t, runningPhaseRowEvents) + + t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents) + require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents) +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index a7e4794ba76..dcb5616008d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -213,7 +213,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { defer func() { waitRetryTime = savedWaitRetryTime }() execStatements(t, []string{ - "create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions + "create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb4 collate utf8mb4_general_ci", // Use general_ci so that we have the same behavior across 5.7 and 8.0 "insert into src values('a', 1), ('c', 2)", fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb), }) @@ -282,7 +282,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { "/update _vt.vreplication set state='Copying'", // Copy mode. "insert into dst(idc,val) values ('a',1)", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"a"}'.*`, + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"a"}'.*`, // Copy-catchup mode. `/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`, ).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer { @@ -292,11 +292,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { // upd1 := expect. upd1 := expect.Then(qh.Eventually( "insert into dst(idc,val) values ('B',3)", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"B"}'.*`, + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"B"}'.*`, )) upd2 := expect.Then(qh.Eventually( "insert into dst(idc,val) values ('c',2)", - `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"c"}'.*`, + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"c"}'.*`, )) upd1.Then(upd2.Eventually()) return upd2 diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 1e1d432956d..135984844ef 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -212,7 +212,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { lastPK := getLastPKFromQR(uvs.plans[tableName].tablePK.Lastpk) filter := uvs.plans[tableName].rule.Filter - log.Infof("Starting copyTable for %s, PK %v", tableName, lastPK) + log.Infof("Starting copyTable for %s, Filter: %s, LastPK: %v", tableName, filter, lastPK) uvs.sendTestEvent(fmt.Sprintf("Copy Start %s", tableName)) err := uvs.vse.StreamRows(ctx, filter, lastPK, func(rows *binlogdatapb.VStreamRowsResponse) error { diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 1293efa3112..6416c5c87de 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -58,6 +58,16 @@ type Plan struct { // of the table. Filters []Filter + // Predicates in the Filter query that we can push down to MySQL + // to reduce the returned rows we need to filter in the VStreamer + // during the copy phase. This will contain any valid expressions + // in the Filter's WHERE clause with the exception of the + // in_keyrange() function which is a filter that must be applied + // by the VStreamer (it's not a valid MySQL function). Note that + // the Filter cannot contain any MySQL functions because the + // VStreamer cannot filter binlog events using them. + whereExprsToPushDown []sqlparser.Expr + // Convert any integer values seen in the binlog events for ENUM or SET // columns to the string values. The map is keyed on the column number, with // the value being the map of ordinal values to string values. @@ -564,6 +574,7 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if where == nil { return nil } + // Only a series of AND expressions are supported. exprs := splitAndExpression(nil, where.Expr) for _, expr := range exprs { switch expr := expr.(type) { @@ -601,10 +612,6 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if !ok { return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) } - // StrVal is varbinary, we do not support varchar since we would have to implement all collation types - if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal { - return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) - } pv, err := evalengine.Translate(val, &evalengine.Config{ Collation: plan.env.CollationEnv().DefaultConnectionCharset(), Environment: plan.env, @@ -622,7 +629,11 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er ColNum: colnum, Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()), }) + // Add it to the expressions that get pushed down to mysqld. + plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) case *sqlparser.FuncExpr: + // We cannot filter binlog events in VStreamer using MySQL functions so + // we only allow the in_keyrange() function, which is VStreamer specific. if !expr.Name.EqualString("in_keyrange") { return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } @@ -648,6 +659,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er Opcode: IsNotNull, ColNum: colnum, }) + // Add it to the expressions that get pushed down to mysqld. + plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) default: return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go index aba74368802..3c9e19349e4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtenv" @@ -174,6 +173,7 @@ func TestMustSendDDL(t *testing.T) { } func TestPlanBuilder(t *testing.T) { + unicodeCollationID := uint32(collations.MySQL8().DefaultConnectionCharset()) t1 := &Table{ Name: "t1", Fields: []*querypb.Field{{ @@ -183,9 +183,8 @@ func TestPlanBuilder(t *testing.T) { Flags: uint32(querypb.MySqlFlag_NUM_FLAG), }, { Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: collations.CollationUtf8mb4ID, }}, } // t1alt has no id column @@ -194,8 +193,7 @@ func TestPlanBuilder(t *testing.T) { Fields: []*querypb.Field{{ Name: "val", Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Charset: unicodeCollationID, }}, } t2 := &Table{ @@ -253,9 +251,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }}, env: vtenv.NewTestEnv(), @@ -276,9 +273,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }}, Filters: []Filter{{ @@ -307,9 +303,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }}, env: vtenv.NewTestEnv(), @@ -330,9 +325,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }}, env: vtenv.NewTestEnv(), @@ -345,9 +339,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }, { ColNum: 0, @@ -368,9 +361,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }, { ColNum: 0, @@ -399,9 +391,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }, { ColNum: 0, @@ -430,9 +421,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }, { ColNum: 0, @@ -451,25 +441,24 @@ func TestPlanBuilder(t *testing.T) { VindexColumns: nil, KeyRange: nil, }}, + whereExprsToPushDown: []sqlparser.Expr{ + sqlparser.NewComparisonExpr(sqlparser.EqualOp, sqlparser.Expr(sqlparser.NewColName("id")), sqlparser.Expr(sqlparser.NewIntLiteral("1")), nil), + }, env: vtenv.NewTestEnv(), }, }, { - inTable: t2, - inRule: &binlogdatapb.Rule{Match: "/t1/"}, - }, { - inTable: regional, - inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select val, id from regional where in_keyrange('-80')"}, + inTable: t1, + inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where val > 'hey' and val < 'there'"}, outPlan: &Plan{ ColExprs: []ColExpr{{ - ColNum: 2, + ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }, { - ColNum: 1, + ColNum: 0, Field: &querypb.Field{ Name: "id", Type: sqltypes.Int64, @@ -477,14 +466,28 @@ func TestPlanBuilder(t *testing.T) { Flags: uint32(querypb.MySqlFlag_NUM_FLAG), }, }}, - Filters: []Filter{{ - Opcode: VindexMatch, - ColNum: 0, - Value: sqltypes.NULL, - Vindex: nil, - VindexColumns: []int{0, 1}, - KeyRange: nil, - }}, + Filters: []Filter{ + { + Opcode: GreaterThan, + ColNum: 1, + Value: sqltypes.NewVarChar("hey"), + Vindex: nil, + VindexColumns: nil, + KeyRange: nil, + }, + { + Opcode: LessThan, + ColNum: 1, + Value: sqltypes.NewVarChar("there"), + Vindex: nil, + VindexColumns: nil, + KeyRange: nil, + }, + }, + whereExprsToPushDown: []sqlparser.Expr{ + sqlparser.NewComparisonExpr(sqlparser.GreaterThanOp, sqlparser.Expr(sqlparser.NewColName("val")), sqlparser.Expr(sqlparser.NewStrLiteral("hey")), nil), + sqlparser.NewComparisonExpr(sqlparser.LessThanOp, sqlparser.Expr(sqlparser.NewColName("val")), sqlparser.Expr(sqlparser.NewStrLiteral("there")), nil), + }, env: vtenv.NewTestEnv(), }, }, { @@ -495,9 +498,8 @@ func TestPlanBuilder(t *testing.T) { ColNum: 1, Field: &querypb.Field{ Name: "val", - Type: sqltypes.VarBinary, - Charset: collations.CollationBinaryID, - Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + Type: sqltypes.VarChar, + Charset: unicodeCollationID, }, }, { ColNum: 0, @@ -511,6 +513,40 @@ func TestPlanBuilder(t *testing.T) { convertUsingUTF8Columns: map[string]bool{"val": true}, env: vtenv.NewTestEnv(), }, + }, { + inTable: t2, + inRule: &binlogdatapb.Rule{Match: "/t1/"}, + }, { + inTable: regional, + inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select val, id from regional where in_keyrange('-80')"}, + outPlan: &Plan{ + ColExprs: []ColExpr{{ + ColNum: 2, + Field: &querypb.Field{ + Name: "val", + Type: sqltypes.VarBinary, + Charset: collations.CollationBinaryID, + Flags: uint32(querypb.MySqlFlag_BINARY_FLAG), + }, + }, { + ColNum: 1, + Field: &querypb.Field{ + Name: "id", + Type: sqltypes.Int64, + Charset: collations.CollationBinaryID, + Flags: uint32(querypb.MySqlFlag_NUM_FLAG), + }, + }}, + Filters: []Filter{{ + Opcode: VindexMatch, + ColNum: 0, + Value: sqltypes.NULL, + Vindex: nil, + VindexColumns: []int{0, 1}, + KeyRange: nil, + }}, + env: vtenv.NewTestEnv(), + }, }, { inTable: regional, inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select id, keyspace_id() from regional"}, @@ -661,7 +697,7 @@ func TestPlanBuilder(t *testing.T) { plan.Filters[ind].Vindex = nil plan.Filters[ind].Vindex = nil } - utils.MustMatch(t, tcase.outPlan, plan) + require.EqualValues(t, tcase.outPlan, plan) }) } } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 317936e4289..7f63a90650d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -260,13 +260,25 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error } prefix = ", " } + + addPushdownExpressions := func() { + for i, expr := range rs.plan.whereExprsToPushDown { + if i != 0 { + // Only AND expressions are supported. + buf.Myprintf(" and ") + } + buf.Myprintf("(%s)", sqlparser.String(expr)) + } + } // If we know the index name that we should be using then tell MySQL // to use it if possible. This helps to ensure that we are able to // leverage the ordering from the index itself and avoid having to // do a FILESORT of all the results. This index should contain all // of the PK columns which are used in the ORDER BY clause below. var indexHint string - if st.PKIndexName != "" { + // If we're pushing down any expressions, we need to let the optimizer + // choose the best index to use. + if st.PKIndexName != "" && len(rs.plan.whereExprsToPushDown) == 0 { escapedPKIndexName, err := sqlescape.EnsureEscaped(st.PKIndexName) if err != nil { return "", err @@ -274,12 +286,18 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error indexHint = fmt.Sprintf(" force index (%s)", escapedPKIndexName) } buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint) - if len(rs.lastpk) != 0 { + if len(rs.lastpk) != 0 { // We're in the Nth copy phase cycle and need to resume if len(rs.lastpk) != len(rs.pkColumns) { return "", fmt.Errorf("cannot build a row streamer plan for the %s table as a lastpk value was provided and the number of primary key values within it (%v) does not match the number of primary key columns in the table (%d)", st.Name, rs.lastpk, rs.pkColumns) } buf.WriteString(" where ") + // First we add any predicates that should be pushed down. + if len(rs.plan.whereExprsToPushDown) > 0 { + addPushdownExpressions() + // Only AND expressions are supported. + buf.Myprintf(" and ") + } prefix := "" // This loop handles the case for composite PKs. For example, // if lastpk was (1,2), the where clause would be: @@ -298,6 +316,9 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error rs.lastpk[lastcol].EncodeSQL(buf) buf.Myprintf(")") } + } else if len(rs.plan.whereExprsToPushDown) > 0 { // We're in the first copy phase cycle + buf.Myprintf(" where ") + addPushdownExpressions() } buf.Myprintf(" order by ", sqlparser.NewIdentifierCS(rs.plan.Table.Name)) prefix = "" diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 47db5fb839a..97ed4093c8f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -23,14 +23,13 @@ import ( "strconv" "testing" - vttablet "vitess.io/vitess/go/vt/vttablet/common" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + vttablet "vitess.io/vitess/go/vt/vttablet/common" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -343,10 +342,10 @@ func TestStreamRowsFilterInt(t *testing.T) { wantStream := []string{ `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, - `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"5"}`, + `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"4"}`, } - wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t1 force index (`PRIMARY`) order by id1" - checkStream(t, "select id1, val from t1 where id2 = 100", nil, wantQuery, wantStream) + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t1 where (id2 = 100) order by id1" + checkStream(t, "select id1, val from t1 where (id2 = 100)", nil, wantQuery, wantStream) require.Equal(t, int64(0), engine.rowStreamerNumPackets.Get()) require.Equal(t, int64(2), engine.rowStreamerNumRows.Get()) require.Less(t, int64(0), engine.vstreamerPacketSize.Get()) @@ -373,10 +372,35 @@ func TestStreamRowsFilterVarBinary(t *testing.T) { wantStream := []string{ `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, - `rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"6"}`, + `rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"5"}`, } - wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 force index (`PRIMARY`) order by id1" - checkStream(t, "select id1, val from t1 where val = 'newton'", nil, wantQuery, wantStream) + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 where (val = 'newton') order by id1" + checkStream(t, "select id1, val from t1 where (val = 'newton')", nil, wantQuery, wantStream) +} + +func TestStreamRowsFilterVarChar(t *testing.T) { + if testing.Short() { + t.Skip() + } + + err := env.SetVSchema(shardedVSchema) + require.NoError(t, err) + defer env.SetVSchema("{}") + + execStatements(t, []string{ + "create table t1(id1 int, val varchar(128), primary key(id1)) character set utf8mb4 collate utf8mb4_general_ci", // Use general_ci so that we have the same behavior across 5.7 and 8.0 + "insert into t1 values (1,'kepler'), (2, 'Ñewton'), (3, 'nEwton'), (4, 'kepler'), (5, 'neẅton'), (6, 'kepler')", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + + wantStream := []string{ + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARCHAR table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:512 charset:45 column_type:"varchar(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, + `rows:{lengths:1 lengths:7 values:"2Ñewton"} rows:{lengths:1 lengths:6 values:"3nEwton"} rows:{lengths:1 lengths:8 values:"5neẅton"} lastpk:{lengths:1 values:"5"}`, + } + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 where (val = 'newton') order by id1" + checkStream(t, "select id1, val from t1 where (val = 'newton')", nil, wantQuery, wantStream) } func TestStreamRowsMultiPacket(t *testing.T) {