Skip to content

Commit

Permalink
Add comment directive to enable values joins and more benchmarks
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui authored and systay committed Feb 19, 2025
1 parent 910796a commit 6dd5f73
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 8 deletions.
89 changes: 82 additions & 7 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package misc
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -57,16 +58,90 @@ func BenchmarkValuesJoin(b *testing.B) {
mcmp, closer := start(b)
defer closer()

for i := 0; i < 200; i++ {
mcmp.Exec(fmt.Sprintf("insert into t1(id1, id2) values (%d, %d)", i, i))
mcmp.Exec(fmt.Sprintf("insert into tbl(id, unq_col, nonunq_col) values (%d, %d, %d)", i, i, i))
type Rep struct {
QueriesRouted map[string]int `json:"QueriesRouted"`
}

b.Run("Simple Joins", func(b *testing.B) {
for range b.N {
mcmp.Exec("select t1.id1, tbl.id from t1, tbl where t1.id2 = tbl.nonunq_col")
getQueriesRouted := func(thisB *testing.B) int {
_, response, _ := clusterInstance.VtgateProcess.MakeAPICall("/debug/vars")
r := Rep{}
err := json.Unmarshal([]byte(response), &r)
require.NoError(thisB, err)

var res int
for _, c := range r.QueriesRouted {
res += c
}
return res
}

b.ReportAllocs()

lhsRowCount := 0
rhsRowCount := 0

insertLHS := func(count int) {
for ; lhsRowCount < count; lhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into t1(id1, id2) values (%d, %d)", lhsRowCount, lhsRowCount))
}
}
insertRHS := func(count int) {
for ; rhsRowCount < count; rhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into tbl(id, unq_col, nonunq_col) values (%d, %d, %d)", rhsRowCount, rhsRowCount, rhsRowCount))
}
})
}

testCases := []struct {
lhsRowCount int
rhsRowCount int
}{
{
lhsRowCount: 20,
rhsRowCount: 10,
},
{
lhsRowCount: 50,
rhsRowCount: 25,
},
{
lhsRowCount: 100,
rhsRowCount: 50,
},
{
lhsRowCount: 200,
rhsRowCount: 100,
},
{
lhsRowCount: 500,
rhsRowCount: 250,
},
{
lhsRowCount: 1000,
rhsRowCount: 500,
},
{
lhsRowCount: 2000,
rhsRowCount: 1000,
},
}

var previousQueriesRoutedSum int
for _, testCase := range testCases {
insertLHS(testCase.lhsRowCount)
insertRHS(testCase.rhsRowCount)

b.Run(fmt.Sprintf("LHS(%d) RHS(%d)", testCase.lhsRowCount, testCase.rhsRowCount), func(b *testing.B) {
for range b.N {
mcmp.Exec("select t1.id1, tbl.id from t1, tbl where t1.id2 = tbl.nonunq_col")
}
b.StopTimer()

totalQueriesRouted := getQueriesRouted(b)
queriesRouted := totalQueriesRouted - previousQueriesRoutedSum
previousQueriesRoutedSum = totalQueriesRouted
b.ReportMetric(float64(queriesRouted/b.N), "queries_routed/op")
})
}
}

func TestBitVals(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
DirectiveAllowHashJoin = "ALLOW_HASH_JOIN"
// DirectiveQueryPlanner lets the user specify per query which planner should be used
DirectiveQueryPlanner = "PLANNER"
// DirectiveAllowValuesJoin allows the planner to use VALUES JOINS when possible.
DirectiveAllowValuesJoin = "ALLOW_VALUES_JOIN"
// DirectiveVExplainRunDMLQueries tells vexplain queries/all that it is okay to also run the query.
DirectiveVExplainRunDMLQueries = "EXECUTE_DML_QUERIES"
// DirectiveConsolidator enables the query consolidator.
Expand Down Expand Up @@ -554,6 +556,10 @@ func AllowScatterDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowScatter)
}

func AllowValuesJoinDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowValuesJoin)
}

func checkDirective(stmt Statement, key string) bool {
cmt, ok := stmt.(Commented)
if ok {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func requiresSwitchingSides(ctx *plancontext.PlanningContext, op Operator) (requ

func newJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinType sqlparser.JoinType) JoinOp {
lhsID := TableID(lhs)
if lhsID.NumberOfTables() > 1 || !joinType.IsInner() {
if !ctx.AllowValuesJoin || lhsID.NumberOfTables() > 1 || !joinType.IsInner() {
return NewApplyJoin(ctx, lhs, rhs, nil, joinType)
}
lhsTableInfo, err := ctx.SemTable.TableInfoFor(lhsID)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/plancontext/planning_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type PlanningContext struct {

PlannerVersion querypb.ExecuteOptions_PlannerVersion

AllowValuesJoin bool

// If we during planning have turned this expression into an argument name,
// we can continue using the same argument name
ReservedArguments map[sqlparser.Expr]string
Expand Down Expand Up @@ -133,6 +135,7 @@ func CreatePlanningContext(stmt sqlparser.Statement,
ReservedArguments: map[sqlparser.Expr]string{},
ValuesJoinColumns: make(map[string]sqlparser.Columns),
Statement: stmt,
AllowValuesJoin: sqlparser.AllowValuesJoinDirective(stmt),
}, nil
}

Expand Down

0 comments on commit 6dd5f73

Please sign in to comment.