Skip to content

Commit

Permalink
Minor tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 1, 2025
1 parent b87608b commit d708223
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 24 deletions.
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ func TestVStreamHeartbeats(t *testing.T) {

// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly
// when they are specified in the VStream API via the rule.Filter.
// It also confirms that we use the proper collations for the vstream filter when
// using varchar fields.
// It also confirms that we use the proper collation for the VStream filter when
// using VARCHAR fields.
func TestVStreamPushdownFilters(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
Expand All @@ -1118,7 +1118,7 @@ func TestVStreamPushdownFilters(t *testing.T) {
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

// Make sure that we get at least one paul event in the copy phase.
// Make sure that we get at least one paul row event in the copy phase.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -214,7 +213,6 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum

filter := rule.Filter
query := filter
log.Errorf("DEBUG: building table plan for %s with filter: %s", tableName, filter)
// generate equivalent select statement if filter is empty or a keyrange.
switch {
case filter == "":
Expand Down
22 changes: 11 additions & 11 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ type Plan struct {
// of the table.
Filters []Filter

// Predicates in the Filter query that we can push down to
// MySQL to reduce the returned rows we need to filter.
// This will contain any valid expressions in the Filter's
// WHERE clause with the exception of the in_keyrange()
// function which is a filter that must be applied by the
// vstreamer (it's not a valid MySQL function).
// Predicates in the Filter query that we can push down to MySQL
// to reduce the returned rows we need to filter in the VStreamer
// during the copy phase. This will contain any valid expressions
// in the Filter's WHERE clause with the exception of the
// in_keyrange() function which is a filter that must be applied
// by the VStreamer (it's not a valid MySQL function). Note that
// the Filter cannot contain any MySQL functions because the
// VStreamer cannot filter binlog events using them.
whereExprsToPushDown []sqlparser.Expr

// Convert any integer values seen in the binlog events for ENUM or SET
Expand Down Expand Up @@ -572,9 +574,9 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if where == nil {
return nil
}
// Only a series of AND expressions are supported.
exprs := splitAndExpression(nil, where.Expr)
for _, expr := range exprs {
log.Errorf("DEBUG: analyzing where expression of type %T: %v", expr, sqlparser.String(expr))
switch expr := expr.(type) {
case *sqlparser.ComparisonExpr:
opcode, err := getOpcode(expr)
Expand Down Expand Up @@ -628,11 +630,10 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
// Add it to the expressions that get pushed down to mysqld.
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.FuncExpr:
// We cannot filter binlog events in vstreamer using MySQL functions so
// we only allow the in_keyrange() function, which is vstreamer specific.
// We cannot filter binlog events in VStreamer using MySQL functions so
// we only allow the in_keyrange() function, which is VStreamer specific.
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand All @@ -658,7 +659,6 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
Opcode: IsNotNull,
ColNum: colnum,
})
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
default:
Expand Down
12 changes: 5 additions & 7 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (rs *rowStreamer) Stream() error {
func (rs *rowStreamer) buildPlan() error {
// This pre-parsing is required to extract the table name
// and create its metadata.
log.Errorf("DEBUG: building rowstreamer plan from query: %s", rs.query)
sel, fromTable, err := analyzeSelect(rs.query, rs.se.Environment().Parser())
if err != nil {
return err
Expand Down Expand Up @@ -177,7 +176,6 @@ func (rs *rowStreamer) buildPlan() error {
log.Errorf("%s", err.Error())
return err
}
log.Errorf("DEBUG: rowstreamer plan: %+v", rs.plan)

directives := sel.Comments.Directives()
if s, found := directives.GetString("ukColumns", ""); found {
Expand All @@ -200,7 +198,6 @@ func (rs *rowStreamer) buildPlan() error {
if err != nil {
return err
}
log.Errorf("DEBUG: rowstreamer final query: %s", rs.sendQuery)
return err
}

Expand Down Expand Up @@ -269,7 +266,6 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
if i != 0 {
buf.Myprintf(" and ")
}
log.Errorf("DEBUG: adding pushdown expression to rowstreamer query: %s", sqlparser.String(expr))
buf.Myprintf("%s", sqlparser.String(expr))
}
}
Expand All @@ -289,15 +285,16 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
indexHint = fmt.Sprintf(" force index (%s)", escapedPKIndexName)
}
buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint)
if len(rs.lastpk) != 0 {
if len(rs.lastpk) != 0 { // We're in the copy phase and need to resume
if len(rs.lastpk) != len(rs.pkColumns) {
return "", fmt.Errorf("cannot build a row streamer plan for the %s table as a lastpk value was provided and the number of primary key values within it (%v) does not match the number of primary key columns in the table (%d)",
st.Name, rs.lastpk, rs.pkColumns)
}
buf.WriteString(" where ")
addPushdownExpressions()
// First we add any predicates that should be pushed down.
addPushdownExpressions()
if len(rs.plan.whereExprsToPushDown) > 0 {
// Only AND expressions are supported.
buf.Myprintf(" and ")
}
prefix := ""
Expand All @@ -312,13 +309,14 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
for i, pk := range rs.pkColumns[:lastcol] {
buf.Myprintf("%v = ", sqlparser.NewIdentifierCI(rs.plan.Table.Fields[pk].Name))
rs.lastpk[i].EncodeSQL(buf)
// Only AND expressions are supported.
buf.Myprintf(" and ")
}
buf.Myprintf("%v > ", sqlparser.NewIdentifierCI(rs.plan.Table.Fields[rs.pkColumns[lastcol]].Name))
rs.lastpk[lastcol].EncodeSQL(buf)
buf.Myprintf(")")
}
} else if len(rs.plan.whereExprsToPushDown) > 0 {
} else if len(rs.plan.whereExprsToPushDown) > 0 { // We're in the running/replicating phase
buf.Myprintf(" where ")
addPushdownExpressions()
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func (uvs *uvstreamer) buildTablePlan() error {
if rule == nil {
continue
}
log.Errorf("DEBUG: building table plan for table %s, with filter: %s", tableName, rule.Filter)
plan := &tablePlan{
tablePK: nil,
rule: &binlogdatapb.Rule{
Expand Down

0 comments on commit d708223

Please sign in to comment.