Skip to content

Commit

Permalink
refactor: planner refactor - try to simplify compactable implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Feb 24, 2025
1 parent 413b953 commit f9e87a5
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 52 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (qb *queryBuilder) addPredicate(expr sqlparser.Expr) {
addPred = sel.AddWhere
qb.stmt = sel
default:
panic(fmt.Sprintf("cant add WHERE to %T", qb.stmt))
panic(fmt.Sprintf("cant add WHERE to %T, %s", qb.stmt, sqlparser.String(expr)))
}

for _, exp := range sqlparser.SplitAndExpression(nil, expr) {
Expand Down
14 changes: 0 additions & 14 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,6 @@ func (f *Filter) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return f.Source.GetOrdering(ctx)
}

func (f *Filter) Compact(*plancontext.PlanningContext) (Operator, *ApplyResult) {
if len(f.Predicates) == 0 {
return f.Source, Rewrote("filter with no predicates removed")
}

other, isFilter := f.Source.(*Filter)
if !isFilter {
return f, NoRewrite
}
f.Source = other.Source
f.Predicates = append(f.Predicates, other.Predicates...)
return f, Rewrote("two filters merged into one")
}

func (f *Filter) planOffsets(ctx *plancontext.PlanningContext) Operator {
cfg := &evalengine.Config{
ResolveType: ctx.TypeForExpr,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ func (j *Join) GetOrdering(*plancontext.PlanningContext) []OrderBy {
return nil
}

func (j *Join) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult) {
func (j *Join) tryCompact(ctx *plancontext.PlanningContext) Operator {
if !j.JoinType.IsCommutative() {
// if we can't move tables around, we can't merge these inputs
return j, NoRewrite
return nil
}

lqg, lok := j.LHS.(*QueryGraph)
rqg, rok := j.RHS.(*QueryGraph)
if !lok || !rok {
return j, NoRewrite
return nil
}

newOp := &QueryGraph{
Expand All @@ -69,7 +69,7 @@ func (j *Join) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult
if j.Predicate != nil {
newOp.collectPredicate(ctx, j.Predicate)
}
return newOp, Rewrote("merge querygraphs into a single one")
return newOp
}

func createStraightJoin(ctx *plancontext.PlanningContext, join *sqlparser.JoinTableExpr, lhs, rhs Operator) Operator {
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/planbuilder/operators/plan_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func PlanQuery(ctx *plancontext.PlanningContext, stmt sqlparser.Statement) (resu
fmt.Println(ToTree(op))
}

op = compact(ctx, op)
checkValid(op)
op = planQuery(ctx, op)

Expand Down
41 changes: 11 additions & 30 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,29 +488,28 @@ func (p *Projection) ShortDescription() string {
return strings.Join(result, ", ")
}

func (p *Projection) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult) {
func (p *Projection) isNeeded() bool {
ap, err := p.GetAliasedProjections()
if err != nil {
return p, NoRewrite
return true // we don't know enough about the columns to make a decision
}

// for projections that are not derived tables, we can check if it is safe to remove or not
needed := false
for i, projection := range ap {
e, ok := projection.Info.(Offset)
if !ok || int(e) != i || projection.Original.As.NotEmpty() {
needed = true
break
return true
}
}

if !needed {
return false
}

func (p *Projection) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult) {
if !p.isNeeded() {
return p.Source, Rewrote("removed projection only passing through the input")
}

switch src := p.Source.(type) {
case *Route:
return p.compactWithRoute(ctx, src)
case *ApplyJoin:
return p.compactWithJoin(ctx, src)
}
Expand All @@ -531,6 +530,9 @@ func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, join *App
if col.Original.As.NotEmpty() {
return p, NoRewrite
}
if len(join.Columns) == 0 {
return p, NoRewrite
}
newColumns = append(newColumns, join.Columns[colInfo])
newColumnsAST.add(join.JoinColumns.columns[colInfo])
case nil:
Expand All @@ -555,27 +557,6 @@ func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, join *App
return join, Rewrote("remove projection from before join")
}

func (p *Projection) compactWithRoute(ctx *plancontext.PlanningContext, rb *Route) (Operator, *ApplyResult) {
ap, err := p.GetAliasedProjections()
if err != nil {
return p, NoRewrite
}

for i, col := range ap {
offset, ok := col.Info.(Offset)
if !ok || int(offset) != i {
return p, NoRewrite
}
}
columns := rb.GetColumns(ctx)

if len(columns) == len(ap) {
return rb, Rewrote("remove projection from before route")
}
rb.ResultColumns = len(columns)
return rb, NoRewrite
}

// needsEvaluation finds the expression given by this argument and checks if the inside and outside expressions match
// we can't rely on the content of the info field since it's not filled in until offset plan time
func (p *Projection) needsEvaluation(ctx *plancontext.PlanningContext, e sqlparser.Expr) bool {
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtgate/planbuilder/operators/projection_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,21 @@ func tryPushProjection(
ctx *plancontext.PlanningContext,
p *Projection,
) (Operator, *ApplyResult) {
if !p.isNeeded() && !p.isDerived() {
return p.Source, Rewrote("removed projection only passing through the input")
}

switch src := p.Source.(type) {
case *Route:
return Swap(p, src, "push projection under route")
case *Limit:
return Swap(p, src, "push projection under limit")
case *ApplyJoin:
op, res := p.compactWithJoin(ctx, src)
if res != NoRewrite {
return op, res
}

if p.FromAggr || !p.canPush(ctx) {
return p, NoRewrite
}
Expand Down Expand Up @@ -239,6 +248,7 @@ func pushProjectionInApplyJoin(
// we can't push down expression evaluation to the rhs if we are not sure if it will even be executed
return p, NoRewrite
}

if IsOuter(src) {
// for outer joins, we have to check that we can send down the projection to the rhs
for _, expr := range ap.GetColumns() {
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func runPhases(ctx *plancontext.PlanningContext, root Operator) Operator {

op = phase.act(ctx, op)
op = runRewriters(ctx, op)
op = compact(ctx, op)
}

return addGroupByOnRHSOfJoin(op)
Expand Down Expand Up @@ -699,6 +698,18 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App
}
src.Outer, in.Source = in, src.Outer
return src, Rewrote("push filter to outer query in subquery container")
case *Filter:
if len(in.Predicates) == 0 {
return in.Source, Rewrote("filter with no predicates removed")
}

other, isFilter := in.Source.(*Filter)
if !isFilter {
return in, NoRewrite
}
in.Source = other.Source
in.Predicates = append(in.Predicates, other.Predicates...)
return in, Rewrote("two filters merged into one")
}

return in, NoRewrite
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func pushDerived(ctx *plancontext.PlanningContext, op *Horizon) (Operator, *Appl
}

func optimizeJoin(ctx *plancontext.PlanningContext, op *Join) (Operator, *ApplyResult) {
if newOp := op.tryCompact(ctx); newOp != nil {
return newOp, Rewrote("merged query graphs")
}
return mergeOrJoin(ctx, op.LHS, op.RHS, sqlparser.SplitAndExpression(nil, op.Predicate), op.JoinType)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (s *planTestSuite) TestOneWithTPCHVSchema() {
defer reset()

env := vtenv.NewTestEnv()
vschema := loadSchema(s.T(), "vschemas/schema.json", true)
vschema := loadSchema(s.T(), "vschemas/tpch_schema.json", true)
vw, err := vschemawrapper.NewVschemaWrapper(env, vschema, TestBuilder)
require.NoError(s.T(), err)

Expand Down

0 comments on commit f9e87a5

Please sign in to comment.