Skip to content

Commit

Permalink
change to rewriting apply joins in later phases instead of creating t…
Browse files Browse the repository at this point in the history
…he value joins up front - wip

Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Feb 19, 2025
1 parent 6dd5f73 commit 9ebd663
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 36 deletions.
70 changes: 68 additions & 2 deletions go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
const (
physicalTransform Phase = iota
initialPlanning
rewriteApplyJoin
pullDistinctFromUnion
delegateAggregation
recursiveCTEHorizons
Expand All @@ -50,6 +51,8 @@ func (p Phase) String() string {
return "physicalTransform"
case initialPlanning:
return "initial horizon planning optimization"
case rewriteApplyJoin:
return "rewrite ApplyJoin to ValuesJoin"
case pullDistinctFromUnion:
return "pull distinct from UNION"
case delegateAggregation:
Expand All @@ -69,8 +72,11 @@ func (p Phase) String() string {
}
}

func (p Phase) shouldRun(s semantics.QuerySignature) bool {
func (p Phase) shouldRun(ctx *plancontext.PlanningContext) bool {
s := ctx.SemTable.QuerySignature
switch p {
case rewriteApplyJoin:
return ctx.AllowValuesJoin
case pullDistinctFromUnion:
return s.Union
case delegateAggregation:
Expand All @@ -85,6 +91,7 @@ func (p Phase) shouldRun(s semantics.QuerySignature) bool {
return s.SubQueries
case dmlWithInput:
return s.DML

default:
return true
}
Expand All @@ -106,11 +113,70 @@ func (p Phase) act(ctx *plancontext.PlanningContext, op Operator) Operator {
return settleSubqueries(ctx, op)
case dmlWithInput:
return findDMLAboveRoute(ctx, op)
case rewriteApplyJoin:
visit := func(op Operator, lhsTables semantics.TableSet, isRoot bool) (Operator, *ApplyResult) {
aj, ok := op.(*ApplyJoin)
if !ok {
return op, NoRewrite
}

vj := newValuesJoin(ctx, aj.LHS, aj.RHS, aj.JoinType)
if vj == nil {
return op, NoRewrite
}

for _, pred := range aj.JoinPredicates.columns {
err := ctx.SkipJoinPredicates(pred.Original)
if err != nil {
panic(err)
}
vj.AddJoinPredicate(ctx, pred.Original)
}

for _, column := range aj.JoinColumns.columns {
vj.AddColumn(ctx, true, false, aeWrap(column.Original))
}

return vj, Rewrote("rewrote ApplyJoin to ValuesJoin")
}

return TopDown(op, TableID, visit, stopAtRoute)

default:
return op
}
}

func newValuesJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinType sqlparser.JoinType) *ValuesJoin {
lhsID := TableID(lhs)
if lhsID.NumberOfTables() > 1 || !joinType.IsInner() {
return nil
}
lhsTableName := getTableName(ctx, lhsID)
bindVariableName := ctx.ReservedVars.ReserveVariable("values")
v := &Values{
unaryOperator: newUnaryOp(rhs),
Name: lhsTableName,
Arg: bindVariableName,
}
return &ValuesJoin{
binaryOperator: newBinaryOp(lhs, v),
BindVarName: bindVariableName,
}
}

func getTableName(ctx *plancontext.PlanningContext, lhsID semantics.TableSet) string {
lhsTableInfo, err := ctx.SemTable.TableInfoFor(lhsID)
if err != nil {
panic(vterrors.VT13001(err.Error())) // TODO: make name up instead of panic, same below
}
lhsTableName, err := lhsTableInfo.Name()
if err != nil {
panic(vterrors.VT13001(err.Error()))
}
return lhsTableName.Name.String()
}

type phaser struct {
current Phase
}
Expand All @@ -124,7 +190,7 @@ func (p *phaser) next(ctx *plancontext.PlanningContext) Phase {

p.current++

if curr.shouldRun(ctx.SemTable.QuerySignature) {
if curr.shouldRun(ctx) {
return curr
}
}
Expand Down
34 changes: 27 additions & 7 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,19 @@ func runPhases(ctx *plancontext.PlanningContext, root Operator) Operator {
}

op = phase.act(ctx, op)
op = runRewriters(ctx, op)
op = runPushDownRewriters(ctx, op)
op = compact(ctx, op)
}

ctx.CurrentPhase = int(DONE)

op = runPushDownRewriters(ctx, op)
op = compact(ctx, op)

return addGroupByOnRHSOfJoin(op)
}

func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator {
func runPushDownRewriters(ctx *plancontext.PlanningContext, root Operator) Operator {
visitor := func(in Operator, _ semantics.TableSet, isRoot bool) (Operator, *ApplyResult) {
switch in := in.(type) {
case *Horizon:
Expand Down Expand Up @@ -105,24 +110,34 @@ func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator {
case *RecurseCTE:
return tryMergeRecurse(ctx, in)
case *Values:
return tryPushValues(in)
return tryPushValues(ctx, in)
default:
return in, NoRewrite
}
}

if pbm, ok := root.(*PercentBasedMirror); ok {
pbm.SetInputs([]Operator{
runRewriters(ctx, pbm.Operator()),
runRewriters(ctx.UseMirror(), pbm.Target()),
runPushDownRewriters(ctx, pbm.Operator()),
runPushDownRewriters(ctx.UseMirror(), pbm.Target()),
})
}

return FixedPointBottomUp(root, TableID, visitor, stopAtRoute)
}

func tryPushValues(in *Values) (Operator, *ApplyResult) {
if src, ok := in.Source.(*Route); ok {
func tryPushValues(ctx *plancontext.PlanningContext, in *Values) (Operator, *ApplyResult) {
switch src := in.Source.(type) {
case *ValuesJoin:
src.LHS = in.Clone([]Operator{src.LHS})
src.RHS = in.Clone([]Operator{src.RHS})
return src, Rewrote("pushed values under value join")
case *Filter:
return Swap(in, src, "pushed values under filter")
case *Route:
if !reachedPhase(ctx, rewriteApplyJoin+1) {
return in, NoRewrite
}
return Swap(in, src, "pushed values under route")
}
return in, NoRewrite
Expand Down Expand Up @@ -707,6 +722,11 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App
}
src.Outer, in.Source = in, src.Outer
return src, Rewrote("push filter to outer query in subquery container")
case *ValuesJoin:
for _, pred := range in.Predicates {
src.AddJoinPredicate(ctx, pred)
}
return src, Rewrote("pushed filter predicates through values join")
}

return in, NoRewrite
Expand Down
29 changes: 2 additions & 27 deletions go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,31 +287,6 @@ func requiresSwitchingSides(ctx *plancontext.PlanningContext, op Operator) (requ
return
}

func newJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinType sqlparser.JoinType) JoinOp {
lhsID := TableID(lhs)
if !ctx.AllowValuesJoin || lhsID.NumberOfTables() > 1 || !joinType.IsInner() {
return NewApplyJoin(ctx, lhs, rhs, nil, joinType)
}
lhsTableInfo, err := ctx.SemTable.TableInfoFor(lhsID)
if err != nil {
panic(vterrors.VT13001(err.Error()))
}
lhsTableName, err := lhsTableInfo.Name()
if err != nil {
panic(vterrors.VT13001(err.Error()))
}
bindVariableName := ctx.ReservedVars.ReserveVariable("values")
v := &Values{
unaryOperator: newUnaryOp(rhs),
Name: lhsTableName.Name.String(),
Arg: bindVariableName,
}
return &ValuesJoin{
binaryOperator: newBinaryOp(lhs, v),
BindVarName: bindVariableName,
}
}

func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr, joinType sqlparser.JoinType) (Operator, *ApplyResult) {
jm := newJoinMerge(joinPredicates, joinType)
newPlan := jm.mergeJoinInputs(ctx, lhs, rhs, joinPredicates)
Expand All @@ -330,14 +305,14 @@ func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredic
return join, Rewrote("use a hash join because we have LIMIT on the LHS")
}

join := newJoin(ctx, Clone(rhs), Clone(lhs), joinType)
join := NewApplyJoin(ctx, Clone(rhs), Clone(lhs), nil, joinType)
for _, pred := range joinPredicates {
join.AddJoinPredicate(ctx, pred)
}
return join, Rewrote("logical join to applyJoin, switching side because LIMIT")
}

join := newJoin(ctx, Clone(lhs), Clone(rhs), joinType)
join := NewApplyJoin(ctx, Clone(lhs), Clone(rhs), nil, joinType)
for _, pred := range joinPredicates {
join.AddJoinPredicate(ctx, pred)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Values struct {

func (v *Values) Clone(inputs []Operator) Operator {
clone := *v

if len(inputs) > 0 {
clone.Source = inputs[0]
}
return &clone
}

Expand Down

0 comments on commit 9ebd663

Please sign in to comment.