diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index b9afc876e74..75b6aaf64c9 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -1093,8 +1093,8 @@ func TestVStreamHeartbeats(t *testing.T) { // TestVStreamPushdownFilters confirms that pushdown filters are applied correctly // when they are specified in the VStream API via the rule.Filter. -// It also confirms that we use the proper collations for the vstream filter when -// using varchar fields. +// It also confirms that we use the proper collation for the VStream filter when +// using VARCHAR fields. func TestVStreamPushdownFilters(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -1118,7 +1118,7 @@ func TestVStreamPushdownFilters(t *testing.T) { defer vtgateConn.Close() verifyClusterHealth(t, vc) - // Make sure that we get at least one paul event in the copy phase. + // Make sure that we get at least one paul row event in the copy phase. _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false) require.NoError(t, err) diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 1d62053a28c..b8a86b94de5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -214,7 +213,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum filter := rule.Filter query := filter - log.Errorf("DEBUG: building table plan for %s with filter: %s", tableName, filter) // generate equivalent select statement if filter is empty or a keyrange. switch { case filter == "": diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 1b7d9d6b69b..e15e70b9fba 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -58,12 +58,14 @@ type Plan struct { // of the table. Filters []Filter - // Predicates in the Filter query that we can push down to - // MySQL to reduce the returned rows we need to filter. - // This will contain any valid expressions in the Filter's - // WHERE clause with the exception of the in_keyrange() - // function which is a filter that must be applied by the - // vstreamer (it's not a valid MySQL function). + // Predicates in the Filter query that we can push down to MySQL + // to reduce the returned rows we need to filter in the VStreamer + // during the copy phase. This will contain any valid expressions + // in the Filter's WHERE clause with the exception of the + // in_keyrange() function which is a filter that must be applied + // by the VStreamer (it's not a valid MySQL function). Note that + // the Filter cannot contain any MySQL functions because the + // VStreamer cannot filter binlog events using them. whereExprsToPushDown []sqlparser.Expr // Convert any integer values seen in the binlog events for ENUM or SET @@ -572,9 +574,9 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if where == nil { return nil } + // Only a series of AND expressions are supported. exprs := splitAndExpression(nil, where.Expr) for _, expr := range exprs { - log.Errorf("DEBUG: analyzing where expression of type %T: %v", expr, sqlparser.String(expr)) switch expr := expr.(type) { case *sqlparser.ComparisonExpr: opcode, err := getOpcode(expr) @@ -628,11 +630,10 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()), }) // Add it to the expressions that get pushed down to mysqld. - log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr)) plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) case *sqlparser.FuncExpr: - // We cannot filter binlog events in vstreamer using MySQL functions so - // we only allow the in_keyrange() function, which is vstreamer specific. + // We cannot filter binlog events in VStreamer using MySQL functions so + // we only allow the in_keyrange() function, which is VStreamer specific. if !expr.Name.EqualString("in_keyrange") { return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } @@ -658,7 +659,6 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er Opcode: IsNotNull, ColNum: colnum, }) - log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr)) // Add it to the expressions that get pushed down to mysqld. plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) default: diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index bc22bfa482d..b4ba5b69155 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -149,7 +149,6 @@ func (rs *rowStreamer) Stream() error { func (rs *rowStreamer) buildPlan() error { // This pre-parsing is required to extract the table name // and create its metadata. - log.Errorf("DEBUG: building rowstreamer plan from query: %s", rs.query) sel, fromTable, err := analyzeSelect(rs.query, rs.se.Environment().Parser()) if err != nil { return err @@ -177,7 +176,6 @@ func (rs *rowStreamer) buildPlan() error { log.Errorf("%s", err.Error()) return err } - log.Errorf("DEBUG: rowstreamer plan: %+v", rs.plan) directives := sel.Comments.Directives() if s, found := directives.GetString("ukColumns", ""); found { @@ -200,7 +198,6 @@ func (rs *rowStreamer) buildPlan() error { if err != nil { return err } - log.Errorf("DEBUG: rowstreamer final query: %s", rs.sendQuery) return err } @@ -269,7 +266,6 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error if i != 0 { buf.Myprintf(" and ") } - log.Errorf("DEBUG: adding pushdown expression to rowstreamer query: %s", sqlparser.String(expr)) buf.Myprintf("%s", sqlparser.String(expr)) } } @@ -289,15 +285,16 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error indexHint = fmt.Sprintf(" force index (%s)", escapedPKIndexName) } buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint) - if len(rs.lastpk) != 0 { + if len(rs.lastpk) != 0 { // We're in the copy phase and need to resume if len(rs.lastpk) != len(rs.pkColumns) { return "", fmt.Errorf("cannot build a row streamer plan for the %s table as a lastpk value was provided and the number of primary key values within it (%v) does not match the number of primary key columns in the table (%d)", st.Name, rs.lastpk, rs.pkColumns) } buf.WriteString(" where ") - addPushdownExpressions() // First we add any predicates that should be pushed down. + addPushdownExpressions() if len(rs.plan.whereExprsToPushDown) > 0 { + // Only AND expressions are supported. buf.Myprintf(" and ") } prefix := "" @@ -312,13 +309,14 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error for i, pk := range rs.pkColumns[:lastcol] { buf.Myprintf("%v = ", sqlparser.NewIdentifierCI(rs.plan.Table.Fields[pk].Name)) rs.lastpk[i].EncodeSQL(buf) + // Only AND expressions are supported. buf.Myprintf(" and ") } buf.Myprintf("%v > ", sqlparser.NewIdentifierCI(rs.plan.Table.Fields[rs.pkColumns[lastcol]].Name)) rs.lastpk[lastcol].EncodeSQL(buf) buf.Myprintf(")") } - } else if len(rs.plan.whereExprsToPushDown) > 0 { + } else if len(rs.plan.whereExprsToPushDown) > 0 { // We're in the running/replicating phase buf.Myprintf(" where ") addPushdownExpressions() } diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 5e68b5ec258..ea475d19676 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -167,7 +167,6 @@ func (uvs *uvstreamer) buildTablePlan() error { if rule == nil { continue } - log.Errorf("DEBUG: building table plan for table %s, with filter: %s", tableName, rule.Filter) plan := &tablePlan{ tablePK: nil, rule: &binlogdatapb.Rule{