Skip to content

Commit

Permalink
Wrap each pushed down expr in parens
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 10, 2025
1 parent e3a0f68 commit 3aa7b51
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
// Only AND expressions are supported.
buf.Myprintf(" and ")
}
buf.Myprintf("%s", sqlparser.String(expr))
buf.Myprintf("(%s)", sqlparser.String(expr))
}
}
// If we know the index name that we should be using then tell MySQL
Expand All @@ -286,15 +286,15 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error
indexHint = fmt.Sprintf(" force index (%s)", escapedPKIndexName)
}
buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint)
if len(rs.lastpk) != 0 { // We're in Nth copy phase cycle and need to resume
if len(rs.lastpk) != 0 { // We're in the Nth copy phase cycle and need to resume
if len(rs.lastpk) != len(rs.pkColumns) {
return "", fmt.Errorf("cannot build a row streamer plan for the %s table as a lastpk value was provided and the number of primary key values within it (%v) does not match the number of primary key columns in the table (%d)",
st.Name, rs.lastpk, rs.pkColumns)
}
buf.WriteString(" where ")
// First we add any predicates that should be pushed down.
addPushdownExpressions()
if len(rs.plan.whereExprsToPushDown) > 0 {
addPushdownExpressions()
// Only AND expressions are supported.
buf.Myprintf(" and ")
}
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func TestStreamRowsFilterInt(t *testing.T) {
`fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`,
`rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"4"}`,
}
wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t1 where id2 = 100 order by id1"
checkStream(t, "select id1, val from t1 where id2 = 100", nil, wantQuery, wantStream)
wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t1 where (id2 = 100) order by id1"
checkStream(t, "select id1, val from t1 where (id2 = 100)", nil, wantQuery, wantStream)
require.Equal(t, int64(0), engine.rowStreamerNumPackets.Get())
require.Equal(t, int64(2), engine.rowStreamerNumRows.Get())
require.Less(t, int64(0), engine.vstreamerPacketSize.Get())
Expand Down Expand Up @@ -374,8 +374,8 @@ func TestStreamRowsFilterVarBinary(t *testing.T) {
`fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`,
`rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"5"}`,
}
wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 where val = 'newton' order by id1"
checkStream(t, "select id1, val from t1 where val = 'newton'", nil, wantQuery, wantStream)
wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 where (val = 'newton') order by id1"
checkStream(t, "select id1, val from t1 where (val = 'newton')", nil, wantQuery, wantStream)
}

func TestStreamRowsFilterVarChar(t *testing.T) {
Expand All @@ -399,8 +399,8 @@ func TestStreamRowsFilterVarChar(t *testing.T) {
`fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARCHAR table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:512 charset:45 column_type:"varchar(128)"} pkfields:{name:"id1" type:INT32 charset:63}`,
`rows:{lengths:1 lengths:7 values:"2Ñewton"} rows:{lengths:1 lengths:6 values:"3nEwton"} rows:{lengths:1 lengths:8 values:"5neẅton"} lastpk:{lengths:1 values:"5"}`,
}
wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 where val = 'newton' order by id1"
checkStream(t, "select id1, val from t1 where val = 'newton'", nil, wantQuery, wantStream)
wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 where (val = 'newton') order by id1"
checkStream(t, "select id1, val from t1 where (val = 'newton')", nil, wantQuery, wantStream)
}

func TestStreamRowsMultiPacket(t *testing.T) {
Expand Down

0 comments on commit 3aa7b51

Please sign in to comment.