Skip to content

Commit

Permalink
feat: added plan type
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Feb 6, 2025
1 parent 9bf3326 commit db6115c
Show file tree
Hide file tree
Showing 55 changed files with 1,945 additions and 237 deletions.
162 changes: 128 additions & 34 deletions go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"bytes"
"encoding/json"
"fmt"
"sync/atomic"
"time"

Expand All @@ -31,41 +32,49 @@ import (
// An instruction (aka Primitive) is typically a tree where
// each node does its part by combining the results of the
// sub-nodes.
type Plan struct {
Type sqlparser.StatementType // The type of query we have
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
}
type (
PlanType int8

// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
atomic.AddUint64(&p.ExecCount, execCount)
atomic.AddUint64(&p.ExecTime, uint64(execTime))
atomic.AddUint64(&p.ShardQueries, shardQueries)
atomic.AddUint64(&p.RowsAffected, rowsAffected)
atomic.AddUint64(&p.RowsReturned, rowsReturned)
atomic.AddUint64(&p.Errors, errors)
}
Plan struct {
Type PlanType // Type of plan
QueryType sqlparser.StatementType // Type of query
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query

// Stats returns a copy of the plan execution statistics
func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
execCount = atomic.LoadUint64(&p.ExecCount)
execTime = time.Duration(atomic.LoadUint64(&p.ExecTime))
shardQueries = atomic.LoadUint64(&p.ShardQueries)
rowsAffected = atomic.LoadUint64(&p.RowsAffected)
rowsReturned = atomic.LoadUint64(&p.RowsReturned)
errors = atomic.LoadUint64(&p.Errors)
return
ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
}
)

const (
PlanUnknown PlanType = iota
PlanLocal
PlanPassthrough
PlanMultiShard
PlanScatter
PlanLookup
PlanJoinOp
PlanComplex
PlanOnlineDDL
PlanDirectDDL
)

func NewPlan(query string, stmt sqlparser.Statement, primitive Primitive, bindVarNeeds *sqlparser.BindVarNeeds, tablesUsed []string) *Plan {
return &Plan{
Type: getPlanType(primitive),
QueryType: sqlparser.ASTToStatementType(stmt),
Original: query,
Instructions: primitive,
BindVarNeeds: bindVarNeeds,
TablesUsed: tablesUsed,
}
}

// MarshalJSON serializes the plan into a JSON representation.
Expand All @@ -77,6 +86,7 @@ func (p *Plan) MarshalJSON() ([]byte, error) {
}

marshalPlan := struct {
Type string
QueryType string
Original string `json:",omitempty"`
Instructions *PrimitiveDescription `json:",omitempty"`
Expand All @@ -88,7 +98,8 @@ func (p *Plan) MarshalJSON() ([]byte, error) {
Errors uint64 `json:",omitempty"`
TablesUsed []string `json:",omitempty"`
}{
QueryType: p.Type.String(),
Type: p.Type.String(),
QueryType: p.QueryType.String(),
Original: p.Original,
Instructions: instructions,
ExecCount: atomic.LoadUint64(&p.ExecCount),
Expand All @@ -110,3 +121,86 @@ func (p *Plan) MarshalJSON() ([]byte, error) {

return b.Bytes(), nil
}

func (p PlanType) String() string {
switch p {
case PlanLocal:
return "Local"
case PlanPassthrough:
return "Passthrough"
case PlanMultiShard:
return "MultiShard"
case PlanScatter:
return "Scatter"
case PlanLookup:
return "Lookup"
case PlanJoinOp:
return "Join"
case PlanComplex:
return "Complex"
case PlanOnlineDDL:
return "OnlineDDL"
case PlanDirectDDL:
return "DirectDDL"
default:
return "Unknown"
}
}

func getPlanType(p Primitive) PlanType {
switch prim := p.(type) {
case *Route:
return getPlanTypeFromRoutingParams(prim.RoutingParameters)
case *Update:
return getPlanTypeFromRoutingParams(prim.RoutingParameters)
case *Delete:
return getPlanTypeFromRoutingParams(prim.RoutingParameters)
case *Insert:

case *Send:
case *Join:
case *DDL:
default:
return PlanComplex

}
return PlanUnknown
}

func getPlanTypeFromRoutingParams(rp *RoutingParameters) PlanType {
if rp == nil {
panic("RoutingParameters is nil, cannot determine plan type")
}
switch rp.Opcode {
case Unsharded, EqualUnique, Next, DBA, Reference:
return PlanPassthrough
case Equal, IN, Between, MultiEqual, SubShard, ByDestination:
return PlanMultiShard
case Scatter:
return PlanScatter
case None:
return PlanLocal
}
panic(fmt.Sprintf("cannot determine plan type for the given opcode: %s", rp.Opcode.String()))
}

// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
atomic.AddUint64(&p.ExecCount, execCount)
atomic.AddUint64(&p.ExecTime, uint64(execTime))
atomic.AddUint64(&p.ShardQueries, shardQueries)
atomic.AddUint64(&p.RowsAffected, rowsAffected)
atomic.AddUint64(&p.RowsReturned, rowsReturned)
atomic.AddUint64(&p.Errors, errors)
}

// Stats returns a copy of the plan execution statistics
func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
execCount = atomic.LoadUint64(&p.ExecCount)
execTime = time.Duration(atomic.LoadUint64(&p.ExecTime))
shardQueries = atomic.LoadUint64(&p.ShardQueries)
rowsAffected = atomic.LoadUint64(&p.RowsAffected)
rowsReturned = atomic.LoadUint64(&p.RowsReturned)
errors = atomic.LoadUint64(&p.Errors)
return
}
10 changes: 5 additions & 5 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (e *Executor) StreamExecute(
var seenResults atomic.Bool
var resultMu sync.Mutex
result := &sqltypes.Result{}
if canReturnRows(plan.Type) {
if canReturnRows(plan.QueryType) {
srr.callback = func(qr *sqltypes.Result) error {
resultMu.Lock()
defer resultMu.Unlock()
Expand Down Expand Up @@ -345,18 +345,18 @@ func (e *Executor) StreamExecute(

// 4: Execute!
err := vc.StreamExecutePrimitive(ctx, plan.Instructions, bindVars, true, func(qr *sqltypes.Result) error {
return srr.storeResultStats(plan.Type, qr)
return srr.storeResultStats(plan.QueryType, qr)
})

// Check if there was partial DML execution. If so, rollback the effect of the partially executed query.
if err != nil {
if !canReturnRows(plan.Type) {
if !canReturnRows(plan.QueryType) {
return e.rollbackExecIfNeeded(ctx, safeSession, bindVars, logStats, err)
}
return err
}

if !canReturnRows(plan.Type) {
if !canReturnRows(plan.QueryType) {
return nil
}

Expand Down Expand Up @@ -435,7 +435,7 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
var qr *sqltypes.Result
var stmtType sqlparser.StatementType
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
stmtType = plan.Type
stmtType = plan.QueryType
qr, err = e.executePlan(ctx, safeSession, plan, vc, bindVars, logStats, time)
return err
}, func(typ sqlparser.StatementType, result *sqltypes.Result) error {
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (e *Executor) newExecute(
return err
}

if plan.Type != sqlparser.StmtShow {
if plan.QueryType != sqlparser.StmtShow {
safeSession.ClearWarnings()
}

Expand All @@ -157,7 +157,7 @@ func (e *Executor) newExecute(
return err
}
if result != nil {
return recResult(plan.Type, result)
return recResult(plan.QueryType, result)
}

// 4: Prepare for execution.
Expand Down Expand Up @@ -234,7 +234,7 @@ func (e *Executor) handleTransactions(
) (*sqltypes.Result, error) {
// We need to explicitly handle errors, and begin/commit/rollback, since these control transactions. Everything else
// will fall through and be handled through planning
switch plan.Type {
switch plan.QueryType {
case sqlparser.StmtBegin:
qr, err := e.handleBegin(ctx, safeSession, logStats, stmt)
return qr, err
Expand Down Expand Up @@ -390,7 +390,7 @@ func (e *Executor) rollbackPartialExec(ctx context.Context, safeSession *econtex
}

func (e *Executor) setLogStats(logStats *logstats.LogStats, plan *engine.Plan, vcursor *econtext.VCursorImpl, execStart time.Time, err error, qr *sqltypes.Result) {
logStats.StmtType = plan.Type.String()
logStats.StmtType = plan.QueryType.String()
logStats.ActiveKeyspace = vcursor.GetKeyspace()
logStats.TablesUsed = plan.TablesUsed
logStats.TabletType = vcursor.TabletType().String()
Expand All @@ -417,7 +417,7 @@ func (e *Executor) logExecutionEnd(logStats *logstats.LogStats, execStart time.T
func (e *Executor) logPlanningFinished(logStats *logstats.LogStats, plan *engine.Plan) time.Time {
execStart := time.Now()
if plan != nil {
logStats.StmtType = plan.Type.String()
logStats.StmtType = plan.QueryType.String()
}
logStats.PlanTime = execStart.Sub(logStats.StartTime)
return execStart
Expand Down
9 changes: 1 addition & 8 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,7 @@ func BuildFromStmt(ctx context.Context, query string, stmt sqlparser.Statement,
primitive = planResult.primitive
tablesUsed = planResult.tables
}
plan := &engine.Plan{
Type: sqlparser.ASTToStatementType(stmt),
Original: query,
Instructions: primitive,
BindVarNeeds: bindVarNeeds,
TablesUsed: tablesUsed,
}
return plan, nil
return engine.NewPlan(query, stmt, primitive, bindVarNeeds, tablesUsed), nil
}

func getConfiguredPlanner(vschema plancontext.VSchema, stmt sqlparser.Statement, query string) (stmtPlanner, error) {
Expand Down
Loading

0 comments on commit db6115c

Please sign in to comment.