Skip to content

Commit

Permalink
add method for pushing one operator under another
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Nov 12, 2022
1 parent 94bda95 commit ba671b2
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 22 deletions.
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/operators/derived.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

type Derived struct {
singleSource
*singleSource

Query sqlparser.SelectStatement
Alias string
Expand All @@ -49,7 +49,7 @@ func newDerived(
columnAliases sqlparser.Columns,
) ops.Operator {
return &Derived{
singleSource: singleSource{Source: src},
singleSource: &singleSource{Source: src},
Query: stmt,
Alias: alias,
ColumnAliases: columnAliases,
Expand All @@ -62,7 +62,7 @@ func (d *Derived) IPhysical() {}
// Clone implements the Operator interface
func (d *Derived) Clone(inputs []ops.Operator) ops.Operator {
return &Derived{
singleSource: singleSource{Source: inputs[0]},
singleSource: &singleSource{Source: inputs[0]},
Query: d.Query,
Alias: d.Alias,
ColumnAliases: sqlparser.CloneColumns(d.ColumnAliases),
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

type Filter struct {
singleSource
*singleSource

Predicates []sqlparser.Expr
}
Expand All @@ -34,7 +34,7 @@ var _ ops.PhysicalOperator = (*Filter)(nil)

func newFilter(op ops.Operator, expr sqlparser.Expr) ops.Operator {
return &Filter{
singleSource: singleSource{Source: op}, Predicates: []sqlparser.Expr{expr},
singleSource: &singleSource{Source: op}, Predicates: []sqlparser.Expr{expr},
}
}

Expand All @@ -46,7 +46,7 @@ func (f *Filter) Clone(inputs []ops.Operator) ops.Operator {
predicatesClone := make([]sqlparser.Expr, len(f.Predicates))
copy(predicatesClone, f.Predicates)
return &Filter{
singleSource: singleSource{Source: inputs[0]},
singleSource: &singleSource{Source: inputs[0]},
Predicates: predicatesClone,
}
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// Horizon is an operator we use until we decide how to handle the source to the horizon.
// It contains information about the planning we have to do after deciding how we will send the query to the tablets.
type Horizon struct {
singleSource
*singleSource

Select sqlparser.SelectStatement

Expand All @@ -39,7 +39,7 @@ func (h *Horizon) IPhysical() {}

func newHorizon(src ops.Operator, stmt sqlparser.SelectStatement) ops.Operator {
return &Horizon{
singleSource: singleSource{src},
singleSource: &singleSource{src},
Select: stmt,
}
}
Expand All @@ -55,7 +55,7 @@ func (h *Horizon) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.

func (h *Horizon) Clone(inputs []ops.Operator) ops.Operator {
return &Horizon{
singleSource: singleSource{inputs[0]},
singleSource: &singleSource{inputs[0]},
Select: h.Select,
}
}
7 changes: 1 addition & 6 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,8 @@ func planHorizons(in ops.Operator) (ops.Operator, error) {
func planHorizon(in *Horizon) (ops.Operator, error) {
rb, isRoute := in.Source.(*Route)
if isRoute && rb.IsSingleShard() && in.Select.GetLimit() == nil {
return planSingleShardRoute(rb, in)
return swap(in, rb), nil
}

return in, nil
}

func planSingleShardRoute(rb *Route, horizon *Horizon) (ops.Operator, error) {
rb.Source, horizon.Source = horizon, rb.Source
return rb, nil
}
26 changes: 25 additions & 1 deletion go/vt/vtgate/planbuilder/operators/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type (
singleSource struct {
Source ops.Operator
}

singleSourceI interface {
ops.Operator
getSource() ops.Operator
setSource(ops.Operator)
}
)

func PlanQuery(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (ops.Operator, error) {
Expand Down Expand Up @@ -89,10 +95,18 @@ func (noInputs) Inputs() []ops.Operator {
}

// Inputs implements the Operator interface
func (s singleSource) Inputs() []ops.Operator {
func (s *singleSource) Inputs() []ops.Operator {
return []ops.Operator{s.Source}
}

func (s *singleSource) getSource() ops.Operator {
return s.Source
}

func (s *singleSource) setSource(src ops.Operator) {
s.Source = src
}

// AddColumn implements the Operator interface
func (noColumns) AddColumn(*plancontext.PlanningContext, sqlparser.Expr) (int, error) {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "this operator cannot accept columns")
Expand All @@ -102,3 +116,13 @@ func (noColumns) AddColumn(*plancontext.PlanningContext, sqlparser.Expr) (int, e
func (noPredicates) AddPredicate(*plancontext.PlanningContext, sqlparser.Expr) (ops.Operator, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "this operator cannot accept predicates")
}

// swap takes two operators with a single source and swaps them. the assumption is that the first op provided has the
// second operator as source. this method will the two operators with each other and return the new root.
// this method is used to push one operator underneath the other
func swap(op, src singleSourceI) ops.Operator {
tmp := src.getSource()
src.setSource(op)
op.setSource(tmp)
return src
}
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type (
Route struct {
singleSource
*singleSource

RouteOpCode engine.Opcode
Keyspace *vindexes.Keyspace
Expand Down Expand Up @@ -99,7 +99,7 @@ func newRoute(
sysTableTableName map[string]evalengine.Expr,
) *Route {
return &Route{
singleSource: singleSource{src},
singleSource: &singleSource{src},
RouteOpCode: opCode,
Keyspace: keyspace,
VindexPreds: vindexPreds,
Expand Down Expand Up @@ -140,7 +140,7 @@ func (r *Route) Cost() int {
// Clone implements the Operator interface
func (r *Route) Clone(inputs []ops.Operator) ops.Operator {
cloneRoute := *r
cloneRoute.singleSource = singleSource{inputs[0]}
cloneRoute.singleSource = &singleSource{inputs[0]}
cloneRoute.VindexPreds = make([]*VindexPlusPredicates, len(r.VindexPreds))
for i, pred := range r.VindexPreds {
// we do this to create a copy of the struct
Expand Down
4 changes: 1 addition & 3 deletions go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ func transformToPhysical(ctx *plancontext.PlanningContext, in ops.Operator) (ops
func optimizeFilter(op *Filter) (ops.Operator, rewrite.TreeIdentity, error) {
if route, ok := op.Source.(*Route); ok {
// let's push the filter into the route
op.Source = route.Source
route.Source = op
return route, rewrite.NewTree, nil
return swap(op, route), rewrite.NewTree, nil
}

return op, rewrite.SameTree, nil
Expand Down

0 comments on commit ba671b2

Please sign in to comment.