From a5fa24683cebe896b09ab5dcef9b2b026f22bc5a Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 21 Feb 2025 13:14:38 +0100 Subject: [PATCH] wip --- go/vt/vtgate/planbuilder/operators/filter.go | 14 -------- go/vt/vtgate/planbuilder/operators/join.go | 8 ++--- .../planbuilder/operators/projection.go | 34 ++++--------------- .../operators/projection_pushing.go | 6 ++++ .../planbuilder/operators/query_planning.go | 12 +++++++ .../planbuilder/operators/route_planning.go | 3 ++ 6 files changed, 32 insertions(+), 45 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/filter.go b/go/vt/vtgate/planbuilder/operators/filter.go index 431461d4ab1..b8a8a10eda0 100644 --- a/go/vt/vtgate/planbuilder/operators/filter.go +++ b/go/vt/vtgate/planbuilder/operators/filter.go @@ -100,20 +100,6 @@ func (f *Filter) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { return f.Source.GetOrdering(ctx) } -func (f *Filter) Compact(*plancontext.PlanningContext) (Operator, *ApplyResult) { - if len(f.Predicates) == 0 { - return f.Source, Rewrote("filter with no predicates removed") - } - - other, isFilter := f.Source.(*Filter) - if !isFilter { - return f, NoRewrite - } - f.Source = other.Source - f.Predicates = append(f.Predicates, other.Predicates...) - return f, Rewrote("two filters merged into one") -} - func (f *Filter) planOffsets(ctx *plancontext.PlanningContext) Operator { cfg := &evalengine.Config{ ResolveType: ctx.TypeForExpr, diff --git a/go/vt/vtgate/planbuilder/operators/join.go b/go/vt/vtgate/planbuilder/operators/join.go index 5e8dbc4eabd..1673a7b68f2 100644 --- a/go/vt/vtgate/planbuilder/operators/join.go +++ b/go/vt/vtgate/planbuilder/operators/join.go @@ -49,16 +49,16 @@ func (j *Join) GetOrdering(*plancontext.PlanningContext) []OrderBy { return nil } -func (j *Join) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult) { +func (j *Join) tryCompact(ctx *plancontext.PlanningContext) Operator { if !j.JoinType.IsCommutative() { // if we can't move tables around, we can't merge these inputs - return j, NoRewrite + return nil } lqg, lok := j.LHS.(*QueryGraph) rqg, rok := j.RHS.(*QueryGraph) if !lok || !rok { - return j, NoRewrite + return nil } newOp := &QueryGraph{ @@ -69,7 +69,7 @@ func (j *Join) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult if j.Predicate != nil { newOp.collectPredicate(ctx, j.Predicate) } - return newOp, Rewrote("merge querygraphs into a single one") + return newOp } func createStraightJoin(ctx *plancontext.PlanningContext, join *sqlparser.JoinTableExpr, lhs, rhs Operator) Operator { diff --git a/go/vt/vtgate/planbuilder/operators/projection.go b/go/vt/vtgate/planbuilder/operators/projection.go index 0f59d445714..295a0056250 100644 --- a/go/vt/vtgate/planbuilder/operators/projection.go +++ b/go/vt/vtgate/planbuilder/operators/projection.go @@ -508,12 +508,10 @@ func (p *Projection) Compact(ctx *plancontext.PlanningContext) (Operator, *Apply return p.Source, Rewrote("removed projection only passing through the input") } - switch src := p.Source.(type) { - case *Route: - return p.compactWithRoute(ctx, src) - case *ApplyJoin: - return p.compactWithJoin(ctx, src) - } + // switch src := p.Source.(type) { + // case *ApplyJoin: + // return p.compactWithJoin(ctx, src) + // } return p, NoRewrite } @@ -531,6 +529,9 @@ func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, join *App if col.Original.As.NotEmpty() { return p, NoRewrite } + if len(join.Columns) == 0 { + return p, NoRewrite + } newColumns = append(newColumns, join.Columns[colInfo]) newColumnsAST.add(join.JoinColumns.columns[colInfo]) case nil: @@ -555,27 +556,6 @@ func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, join *App return join, Rewrote("remove projection from before join") } -func (p *Projection) compactWithRoute(ctx *plancontext.PlanningContext, rb *Route) (Operator, *ApplyResult) { - ap, err := p.GetAliasedProjections() - if err != nil { - return p, NoRewrite - } - - for i, col := range ap { - offset, ok := col.Info.(Offset) - if !ok || int(offset) != i { - return p, NoRewrite - } - } - columns := rb.GetColumns(ctx) - - if len(columns) == len(ap) { - return rb, Rewrote("remove projection from before route") - } - rb.ResultColumns = len(columns) - return rb, NoRewrite -} - // needsEvaluation finds the expression given by this argument and checks if the inside and outside expressions match // we can't rely on the content of the info field since it's not filled in until offset plan time func (p *Projection) needsEvaluation(ctx *plancontext.PlanningContext, e sqlparser.Expr) bool { diff --git a/go/vt/vtgate/planbuilder/operators/projection_pushing.go b/go/vt/vtgate/planbuilder/operators/projection_pushing.go index 1d1a5a04501..fa9926e90ba 100644 --- a/go/vt/vtgate/planbuilder/operators/projection_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/projection_pushing.go @@ -82,6 +82,11 @@ func tryPushProjection( case *Limit: return Swap(p, src, "push projection under limit") case *ApplyJoin: + op, res := p.compactWithJoin(ctx, src) + if res != NoRewrite { + return op, res + } + if p.FromAggr || !p.canPush(ctx) { return p, NoRewrite } @@ -239,6 +244,7 @@ func pushProjectionInApplyJoin( // we can't push down expression evaluation to the rhs if we are not sure if it will even be executed return p, NoRewrite } + if IsOuter(src) { // for outer joins, we have to check that we can send down the projection to the rhs for _, expr := range ap.GetColumns() { diff --git a/go/vt/vtgate/planbuilder/operators/query_planning.go b/go/vt/vtgate/planbuilder/operators/query_planning.go index f70392252c1..0f8b2da9a88 100644 --- a/go/vt/vtgate/planbuilder/operators/query_planning.go +++ b/go/vt/vtgate/planbuilder/operators/query_planning.go @@ -699,6 +699,18 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App } src.Outer, in.Source = in, src.Outer return src, Rewrote("push filter to outer query in subquery container") + case *Filter: + if len(in.Predicates) == 0 { + return in.Source, Rewrote("filter with no predicates removed") + } + + other, isFilter := in.Source.(*Filter) + if !isFilter { + return in, NoRewrite + } + in.Source = other.Source + in.Predicates = append(in.Predicates, other.Predicates...) + return in, Rewrote("two filters merged into one") } return in, NoRewrite diff --git a/go/vt/vtgate/planbuilder/operators/route_planning.go b/go/vt/vtgate/planbuilder/operators/route_planning.go index b5bcf37c127..c42d370eda3 100644 --- a/go/vt/vtgate/planbuilder/operators/route_planning.go +++ b/go/vt/vtgate/planbuilder/operators/route_planning.go @@ -53,6 +53,9 @@ func pushDerived(ctx *plancontext.PlanningContext, op *Horizon) (Operator, *Appl } func optimizeJoin(ctx *plancontext.PlanningContext, op *Join) (Operator, *ApplyResult) { + if newOp := op.tryCompact(ctx); newOp != nil { + return newOp, Rewrote("merged query graphs") + } return mergeOrJoin(ctx, op.LHS, op.RHS, sqlparser.SplitAndExpression(nil, op.Predicate), op.JoinType) }