diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index 98b7677a1f3..c64da4f031c 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -425,11 +425,11 @@ func IsSimpleTuple(node Expr) bool { return false } -// IsLockingFunc returns true for all functions that are used to work with mysql advisory locks -func IsLockingFunc(node Expr) bool { - switch node.(type) { - case *LockingFunc: +func SupportsOptimizerHint(stmt StatementType) bool { + switch stmt { + case StmtSelect, StmtInsert, StmtUpdate, StmtDelete, StmtStream, StmtVStream: return true + default: + return false } - return false } diff --git a/go/vt/vtgate/engine/plan.go b/go/vt/vtgate/engine/plan.go index 9ea9f07655c..2df8facb314 100644 --- a/go/vt/vtgate/engine/plan.go +++ b/go/vt/vtgate/engine/plan.go @@ -19,9 +19,14 @@ package engine import ( "bytes" "encoding/json" + "fmt" "sync/atomic" "time" + "vitess.io/vitess/go/cache/theine" + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/vt/vthash" + "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" ) @@ -38,6 +43,7 @@ type Plan struct { BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs TablesUsed []string // TablesUsed is the list of tables that this plan will query + QueryHints sqlparser.QueryHints ExecCount uint64 // Count of times this plan was executed ExecTime uint64 // Total execution time @@ -47,6 +53,29 @@ type Plan struct { Errors uint64 // Total number of errors } +type PlanKey struct { + CurrentKeyspace string + Query string + SetVarComment string + Collation collations.ID +} + +func (pk PlanKey) DebugString() string { + return fmt.Sprintf("CurrentKeyspace: %s, Query: %s, SetVarComment: %s, Collation: %d", pk.CurrentKeyspace, pk.Query, pk.SetVarComment, pk.Collation) +} + +func (pk PlanKey) Hash() theine.HashKey256 { + hasher := vthash.New256() + _, _ = hasher.WriteUint16(uint16(pk.Collation)) + _, _ = hasher.WriteString(pk.CurrentKeyspace) + _, _ = hasher.WriteString(pk.SetVarComment) + _, _ = hasher.WriteString(pk.Query) + + var planKey theine.HashKey256 + hasher.Sum(planKey[:0]) + return planKey +} + // AddStats updates the plan execution statistics func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) { atomic.AddUint64(&p.ExecCount, execCount) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 98cfe503580..78f1e78b527 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1098,10 +1098,12 @@ func (e *Executor) getPlan( return nil, vterrors.VT13001("vschema not initialized") } - qh, err := sqlparser.BuildQueryHints(stmt) + plan, err := e.getCachedOrBuild(ctx, vcursor, query, stmt, reservedVars, bindVars, allowParameterization, comments, logStats) if err != nil { return nil, err } + + qh := plan.QueryHints vcursor.SetIgnoreMaxMemoryRows(qh.IgnoreMaxMemoryRows) vcursor.SetConsolidator(qh.Consolidator) vcursor.SetWorkloadName(qh.Workload) @@ -1109,10 +1111,48 @@ func (e *Executor) getPlan( vcursor.SetPriority(qh.Priority) vcursor.SetExecQueryTimeout(qh.Timeout) - setVarComment, err := prepareSetVarComment(vcursor, stmt) + // TODO: do this after getting a plan. + // if needsReservedConn(plan.Type) { + // switch plan.Type { + // // If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed + // case sqlparser.StmtBegin, sqlparser.StmtCommit, sqlparser.StmtRollback, sqlparser.StmtSavepoint, + // sqlparser.StmtSRollback, sqlparser.StmtRelease, sqlparser.StmtSet, sqlparser.StmtShow: + // case sqlparser.SupportsOptimizerHint(plan.Type): + // default: + // vc.NeedsReservedConn() + // return "", nil + // } + + return plan, nil +} + +func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey { + hasher := vthash.New256() + vcursor.KeyForPlan(ctx, query, hasher) + + var planKey PlanCacheKey + hasher.Sum(planKey[:0]) + return planKey +} + +func (e *Executor) getCachedOrBuild( + ctx context.Context, + vcursor *econtext.VCursorImpl, + query string, + stmt sqlparser.Statement, + reservedVars *sqlparser.ReservedVars, + bindVars map[string]*querypb.BindVariable, + allowParameterization bool, + comments sqlparser.MarginComments, + logStats *logstats.LogStats, +) (*engine.Plan, error) { + setVarComment := vcursor.PrepareSetVarComment() + + qh, err := sqlparser.BuildQueryHints(stmt) if err != nil { return nil, err } + vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks) rewriteASTResult, err := sqlparser.Normalize( stmt, @@ -1138,16 +1178,26 @@ func (e *Executor) getPlan( logStats.SQL = comments.Leading + query + comments.Trailing logStats.BindVariables = sqltypes.CopyBindVariables(bindVars) - return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats) -} + planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan() + if planCachable { + // build Plan key + pk := engine.PlanKey{ + CurrentKeyspace: vcursor.GetKeyspace(), + Query: query, + SetVarComment: setVarComment, + Collation: vcursor.ConnCollation(), + } -func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey { - hasher := vthash.New256() - vcursor.KeyForPlan(ctx, query, hasher) + planKey := pk.Hash() - var planKey PlanCacheKey - hasher.Sum(planKey[:0]) - return planKey + var plan *engine.Plan + var err error + plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey, e.epoch.Load(), func() (*engine.Plan, error) { + return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh) + }) + return plan, err + } + return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh) } func (e *Executor) buildStatement( @@ -1157,6 +1207,7 @@ func (e *Executor) buildStatement( stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, bindVarNeeds *sqlparser.BindVarNeeds, + qh sqlparser.QueryHints, ) (*engine.Plan, error) { plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, e.ddlConfig) if err != nil { @@ -1164,66 +1215,12 @@ func (e *Executor) buildStatement( } plan.Warnings = vcursor.GetAndEmptyWarnings() + plan.QueryHints = qh err = e.checkThatPlanIsValid(stmt, plan) return plan, err } -func (e *Executor) cacheAndBuildStatement( - ctx context.Context, - vcursor *econtext.VCursorImpl, - query string, - stmt sqlparser.Statement, - reservedVars *sqlparser.ReservedVars, - bindVarNeeds *sqlparser.BindVarNeeds, - logStats *logstats.LogStats, -) (*engine.Plan, error) { - planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan() - if planCachable { - planKey := e.hashPlan(ctx, vcursor, query) - - var plan *engine.Plan - var err error - plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey, e.epoch.Load(), func() (*engine.Plan, error) { - return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds) - }) - return plan, err - } - return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds) -} - -func (e *Executor) canNormalizeStatement(stmt sqlparser.Statement, setVarComment string) bool { - return sqlparser.CanNormalize(stmt) || setVarComment != "" -} - -func prepareSetVarComment(vcursor *econtext.VCursorImpl, stmt sqlparser.Statement) (string, error) { - if vcursor == nil || vcursor.Session().InReservedConn() { - return "", nil - } - - if !vcursor.Session().HasSystemVariables() { - return "", nil - } - - switch stmt.(type) { - // If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed - case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint, - *sqlparser.SRollback, *sqlparser.Release, *sqlparser.Set, *sqlparser.Show: - return "", nil - case sqlparser.SupportOptimizerHint: - break - default: - vcursor.NeedsReservedConn() - return "", nil - } - - var res strings.Builder - vcursor.Session().GetSystemVariables(func(k, v string) { - res.WriteString(fmt.Sprintf("SET_VAR(%s = %s) ", k, v)) - }) - return strings.TrimSpace(res.String()), nil -} - func (e *Executor) debugCacheEntries() (items map[string]*engine.Plan) { items = make(map[string]*engine.Plan) e.ForEachPlan(func(plan *engine.Plan) bool { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 3d8261495fe..5e750cfce75 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -449,7 +449,7 @@ func TestCreateTableValidTimestamp(t *testing.T) { query := "create table aa(t timestamp default 0)" _, err := executor.Execute(context.Background(), nil, "TestSelect", session, query, map[string]*querypb.BindVariable{}) require.NoError(t, err) - require.True(t, session.InReservedConn()) + assert.True(t, session.InReservedConn()) wantQueries := []*querypb.BoundQuery{ {Sql: "set sql_mode = ALLOW_INVALID_DATES", BindVariables: map[string]*querypb.BindVariable{}}, diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index 19b8f3cbe3a..8824acffa6b 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -213,6 +213,38 @@ func NewVCursorImpl( }, nil } +func (vc *VCursorImpl) PrepareSetVarComment() string { + // if vc.Session().InReservedConn() { + // return "" + // } + + // TODO: handle foreign keys as it changes the plan. + // verify it, mostly nothing to do here. + + // if !vc.Session().HasSystemVariables() { + // return "", nil + // } + + // TODO: do this after getting a plan. + // switch stmt.(type) { + // // If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed + // case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint, + // *sqlparser.SRollback, *sqlparser.Release, *sqlparser.Set, *sqlparser.Show: + // return "", nil + // case sqlparser.SupportOptimizerHint: + // break + // default: + // vc.NeedsReservedConn() + // return "", nil + // } + + var res strings.Builder + vc.Session().GetSystemVariables(func(k, v string) { + res.WriteString(fmt.Sprintf("SET_VAR(%s = %s) ", k, v)) + }) + return strings.TrimSpace(res.String()) +} + func (vc *VCursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { callerId := callerid.EffectiveCallerIDFromContext(ctx) immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) diff --git a/go/vt/vthash/highway/highwayhash.go b/go/vt/vthash/highway/highwayhash.go index a922b435d9d..20b9ca4d3a8 100644 --- a/go/vt/vthash/highway/highwayhash.go +++ b/go/vt/vthash/highway/highwayhash.go @@ -142,6 +142,12 @@ func (d *Digest) Write(p []byte) (n int, err error) { return } +func (d *Digest) WriteUint16(x uint16) (int, error) { + var b [2]byte + binary.LittleEndian.PutUint16(b[:], x) + return d.Write(b[:]) +} + func (d *Digest) Sum(b []byte) []byte { state := d.state if d.offset > 0 {