Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator for join-values #17641

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
455fe86
Cherry-picked the engine primitive for values join
frouioui Jan 14, 2025
bd6d67f
Enhance ValuesJoin engine and improve unit test
frouioui Jan 15, 2025
491f190
Merge remote-tracking branch 'origin/main' into values-join-engine
frouioui Jan 22, 2025
b4f62e3
wip - dirty implementation of findRoute for Values
frouioui Jan 22, 2025
9dda778
Finalize the Values OpCode and TestFindRouteValuesJoin
frouioui Jan 23, 2025
b265101
Use MultiEqual instead of Values OpCode
frouioui Jan 23, 2025
16a30f3
Add missing headers
frouioui Jan 28, 2025
e9fb2c7
Remove routing multi equal optimization
frouioui Jan 28, 2025
5793d60
Add VALUES operator and SQL_builder bit
frouioui Jan 28, 2025
2e7ae91
Add the ValuesJoin operator and SQL_builder test
frouioui Jan 29, 2025
7a695b0
Rename SQL_builder -> op_to_ast
frouioui Jan 29, 2025
ba53da7
add logic for splitting expressions
systay Jan 30, 2025
e2f4e37
offset planning for ValuesJoin
frouioui Feb 4, 2025
b824a25
feat: change the value-join to have a RowID mode
frouioui Feb 13, 2025
df43f22
Merge remote-tracking branch 'origin/main' into values-join-planner
frouioui Feb 13, 2025
b715c40
Use ROW_TUPLE and simplify planOffsets
frouioui Feb 13, 2025
6b5f854
Make sure Values operator conserve the table qualifier
frouioui Feb 13, 2025
f534a4c
Revert changes to onecase
frouioui Feb 13, 2025
910796a
Addition of a macro benchmark for values join
frouioui Feb 13, 2025
6dd5f73
Add comment directive to enable values joins and more benchmarks
frouioui Feb 17, 2025
9ebd663
change to rewriting apply joins in later phases instead of creating t…
systay Feb 19, 2025
4dd3589
add columns before predicates
systay Feb 20, 2025
df126a2
refactor: change how we handle Values and ValuesJoin planning - wip
systay Feb 20, 2025
c558b3f
feat: fix pushing of values and filters to a value join
GuptaManan100 Feb 21, 2025
9efb1f9
Merge remote-tracking branch 'upstream/main' into values-join-planner
systay Feb 21, 2025
3e4a6ae
wip
systay Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package misc
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"testing"
Expand All @@ -34,7 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/utils"
)

func start(t *testing.T) (utils.MySQLCompare, func()) {
func start(t testing.TB) (utils.MySQLCompare, func()) {
mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams)
require.NoError(t, err)

Expand All @@ -53,6 +54,96 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
}
}

func BenchmarkValuesJoin(b *testing.B) {
mcmp, closer := start(b)
defer closer()

type Rep struct {
QueriesRouted map[string]int `json:"QueriesRouted"`
}

getQueriesRouted := func(thisB *testing.B) int {
_, response, _ := clusterInstance.VtgateProcess.MakeAPICall("/debug/vars")
r := Rep{}
err := json.Unmarshal([]byte(response), &r)
require.NoError(thisB, err)

var res int
for _, c := range r.QueriesRouted {
res += c
}
return res
}

b.ReportAllocs()

lhsRowCount := 0
rhsRowCount := 0

insertLHS := func(count int) {
for ; lhsRowCount < count; lhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into t1(id1, id2) values (%d, %d)", lhsRowCount, lhsRowCount))
}
}
insertRHS := func(count int) {
for ; rhsRowCount < count; rhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into tbl(id, unq_col, nonunq_col) values (%d, %d, %d)", rhsRowCount, rhsRowCount, rhsRowCount))
}
}

testCases := []struct {
lhsRowCount int
rhsRowCount int
}{
{
lhsRowCount: 20,
rhsRowCount: 10,
},
{
lhsRowCount: 50,
rhsRowCount: 25,
},
{
lhsRowCount: 100,
rhsRowCount: 50,
},
{
lhsRowCount: 200,
rhsRowCount: 100,
},
{
lhsRowCount: 500,
rhsRowCount: 250,
},
{
lhsRowCount: 1000,
rhsRowCount: 500,
},
{
lhsRowCount: 2000,
rhsRowCount: 1000,
},
}

var previousQueriesRoutedSum int
for _, testCase := range testCases {
insertLHS(testCase.lhsRowCount)
insertRHS(testCase.rhsRowCount)

b.Run(fmt.Sprintf("LHS(%d) RHS(%d)", testCase.lhsRowCount, testCase.rhsRowCount), func(b *testing.B) {
for range b.N {
mcmp.Exec("select t1.id1, tbl.id from t1, tbl where t1.id2 = tbl.nonunq_col")
}
b.StopTimer()

totalQueriesRouted := getQueriesRouted(b)
queriesRouted := totalQueriesRouted - previousQueriesRoutedSum
previousQueriesRoutedSum = totalQueriesRouted
b.ReportMetric(float64(queriesRouted/b.N), "queries_routed/op")
})
}
}

func TestBitVals(t *testing.T) {
mcmp, closer := start(t)
defer closer()
Expand Down
6 changes: 6 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
DirectiveAllowHashJoin = "ALLOW_HASH_JOIN"
// DirectiveQueryPlanner lets the user specify per query which planner should be used
DirectiveQueryPlanner = "PLANNER"
// DirectiveAllowValuesJoin allows the planner to use VALUES JOINS when possible.
DirectiveAllowValuesJoin = "ALLOW_VALUES_JOIN"
// DirectiveVExplainRunDMLQueries tells vexplain queries/all that it is okay to also run the query.
DirectiveVExplainRunDMLQueries = "EXECUTE_DML_QUERIES"
// DirectiveConsolidator enables the query consolidator.
Expand Down Expand Up @@ -554,6 +556,10 @@ func AllowScatterDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowScatter)
}

func AllowValuesJoinDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowValuesJoin)
}

func checkDirective(stmt Statement, key string) bool {
cmt, ok := stmt.(Commented)
if ok {
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 18 additions & 18 deletions go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDeleteUnsharded(t *testing.T) {
},
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestDeleteEqual(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestDeleteEqualMultiCol(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestDeleteEqualNoRoute(t *testing.T) {
},
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestDeleteEqualNoScatter(t *testing.T) {
},
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
"1|4|5|6",
)}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand All @@ -231,7 +231,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
})

// No rows changing
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")
_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand All @@ -252,7 +252,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
"1|4|5|6",
"1|7|8|9",
)}
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")
vc.results = results

_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestDeleteOwnedVindexMultiCol(t *testing.T) {
"1|2|4",
)}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestDeleteSharded(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestDeleteShardedStreaming(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
err := del.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error {
return nil
})
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
"1|4|5|6",
)}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand All @@ -453,7 +453,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
})

// No rows changing
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")

_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
Expand All @@ -475,7 +475,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
"1|4|5|6",
"1|7|8|9",
)}
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")
vc.results = results

_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -528,7 +528,7 @@ func TestDeleteInChangedVindexMultiCol(t *testing.T) {
"1|3|6",
"2|3|7",
)}
vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -565,7 +565,7 @@ func TestDeleteEqualSubshard(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestDeleteMultiEqual(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
Expand Down Expand Up @@ -635,7 +635,7 @@ func TestDeleteInUnique(t *testing.T) {
Type: querypb.Type_TUPLE,
Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))),
}
vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false)
require.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/dml_with_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {
OutputCols: [][]int{{0}},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
OutputCols: [][]int{{1, 0}},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestDeleteWithMultiTarget(t *testing.T) {
OutputCols: [][]int{{0}, {1, 2}},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestUpdateWithInputNonLiteral(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = []*sqltypes.Result{
{RowsAffected: 1}, {RowsAffected: 1}, {RowsAffected: 1},
}
Expand Down
Loading
Loading