diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index 98b7677a1f3..2a85086aae6 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -145,12 +145,11 @@ func CanNormalize(stmt Statement) bool { // CachePlan takes Statement and returns true if the query plan should be cached func CachePlan(stmt Statement) bool { - switch stmt.(type) { - case *Select, *Insert, *Update, *Delete, *Union, *Stream: - return !checkDirective(stmt, DirectiveSkipQueryPlanCache) - default: + _, supportSetVar := stmt.(SupportOptimizerHint) + if !supportSetVar { return false } + return !checkDirective(stmt, DirectiveSkipQueryPlanCache) } // MustRewriteAST takes Statement and returns true if RewriteAST must run on it for correct execution irrespective of user flags. @@ -425,11 +424,11 @@ func IsSimpleTuple(node Expr) bool { return false } -// IsLockingFunc returns true for all functions that are used to work with mysql advisory locks -func IsLockingFunc(node Expr) bool { - switch node.(type) { - case *LockingFunc: +func SupportsOptimizerHint(stmt StatementType) bool { + switch stmt { + case StmtSelect, StmtInsert, StmtUpdate, StmtDelete, StmtStream, StmtVStream: return true + default: + return false } - return false } diff --git a/go/vt/sysvars/sysvars.go b/go/vt/sysvars/sysvars.go index 297ed956bf8..17e495ee9ca 100644 --- a/go/vt/sysvars/sysvars.go +++ b/go/vt/sysvars/sysvars.go @@ -87,6 +87,9 @@ var ( ReadAfterWriteTimeOut = SystemVariable{Name: "read_after_write_timeout"} SessionTrackGTIDs = SystemVariable{Name: "session_track_gtids", IdentifierAsString: true} + // Filled in from VitessAware, ReadOnly, IgnoreThese, NotSupported, UseReservedConn, CheckAndIgnore + AllSystemVariables map[string]SystemVariable + VitessAware = []SystemVariable{ Autocommit, ClientFoundRows, @@ -269,6 +272,31 @@ var ( } ) +func init() { + AllSystemVariables = make(map[string]SystemVariable) + for _, set := range [][]SystemVariable{ + VitessAware, + ReadOnly, + IgnoreThese, + NotSupported, + UseReservedConn, + CheckAndIgnore, + } { + for _, v := range set { + AllSystemVariables[v.Name] = v + } + } +} + +func SupportsSetVar(name string) bool { + sys, ok := AllSystemVariables[name] + if !ok { + return false + } + + return sys.SupportSetVar +} + // GetInterestingVariables is used to return all the variables that may be listed in a SHOW VARIABLES command. func GetInterestingVariables() []string { var res []string diff --git a/go/vt/vtgate/engine/plan.go b/go/vt/vtgate/engine/plan.go index 9ea9f07655c..2df8facb314 100644 --- a/go/vt/vtgate/engine/plan.go +++ b/go/vt/vtgate/engine/plan.go @@ -19,9 +19,14 @@ package engine import ( "bytes" "encoding/json" + "fmt" "sync/atomic" "time" + "vitess.io/vitess/go/cache/theine" + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/vt/vthash" + "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" ) @@ -38,6 +43,7 @@ type Plan struct { BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs TablesUsed []string // TablesUsed is the list of tables that this plan will query + QueryHints sqlparser.QueryHints ExecCount uint64 // Count of times this plan was executed ExecTime uint64 // Total execution time @@ -47,6 +53,29 @@ type Plan struct { Errors uint64 // Total number of errors } +type PlanKey struct { + CurrentKeyspace string + Query string + SetVarComment string + Collation collations.ID +} + +func (pk PlanKey) DebugString() string { + return fmt.Sprintf("CurrentKeyspace: %s, Query: %s, SetVarComment: %s, Collation: %d", pk.CurrentKeyspace, pk.Query, pk.SetVarComment, pk.Collation) +} + +func (pk PlanKey) Hash() theine.HashKey256 { + hasher := vthash.New256() + _, _ = hasher.WriteUint16(uint16(pk.Collation)) + _, _ = hasher.WriteString(pk.CurrentKeyspace) + _, _ = hasher.WriteString(pk.SetVarComment) + _, _ = hasher.WriteString(pk.Query) + + var planKey theine.HashKey256 + hasher.Sum(planKey[:0]) + return planKey +} + // AddStats updates the plan execution statistics func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) { atomic.AddUint64(&p.ExecCount, execCount) diff --git a/go/vt/vtgate/engine/set.go b/go/vt/vtgate/engine/set.go index f0de330cbed..e5d38352db2 100644 --- a/go/vt/vtgate/engine/set.go +++ b/go/vt/vtgate/engine/set.go @@ -352,11 +352,16 @@ func (svs *SysVarReservedConn) checkAndUpdateSysVar(ctx context.Context, vcursor var buf strings.Builder value.EncodeSQL(&buf) s := buf.String() + if s == `''` { + // SET_VAR(sql_mode, '') is not accepted by MySQL, giving a warning: + // | Warning | 1064 | Optimizer hint syntax error near ''') */ + s = `' '` + } vcursor.Session().SetSysVar(svs.Name, s) // If the condition below is true, we want to use reserved connection instead of SET_VAR query hint. // MySQL supports SET_VAR only in MySQL80 and for a limited set of system variables. - if !svs.SupportSetVar || s == "''" || !vcursor.CanUseSetVar() { + if !svs.SupportSetVar || !vcursor.CanUseSetVar() { vcursor.Session().NeedsReservedConn() return true, nil } diff --git a/go/vt/vtgate/engine/set_test.go b/go/vt/vtgate/engine/set_test.go index 0677ee40bd8..e9e43ea1f41 100644 --- a/go/vt/vtgate/engine/set_test.go +++ b/go/vt/vtgate/engine/set_test.go @@ -451,8 +451,8 @@ func TestSetTable(t *testing.T) { expectedQueryLog: []string{ `ResolveDestinations ks [] Destinations:DestinationKeyspaceID(00)`, `ExecuteMultiShard ks.-20: select @@sql_mode orig, '' new {} false false`, - "SysVar set with (sql_mode,'')", - "Needs Reserved Conn", + "SysVar set with (sql_mode,' ')", + "SET_VAR can be used", }, qr: []*sqltypes.Result{sqltypes.MakeTestResult(sqltypes.MakeTestFields("orig|new", "varchar|varchar"), "a|", @@ -478,7 +478,7 @@ func TestSetTable(t *testing.T) { "|a", )}, }, { - testName: "sql_mode change to empty - non empty orig - MySQL80 - should use reserved conn", + testName: "sql_mode change to empty - non empty orig - MySQL80 - set_var allowed", mysqlVersion: "8.0.0", setOps: []SetOp{ &SysVarReservedConn{ @@ -491,8 +491,8 @@ func TestSetTable(t *testing.T) { expectedQueryLog: []string{ `ResolveDestinations ks [] Destinations:DestinationKeyspaceID(00)`, `ExecuteMultiShard ks.-20: select @@sql_mode orig, '' new {} false false`, - "SysVar set with (sql_mode,'')", - "Needs Reserved Conn", + "SysVar set with (sql_mode,' ')", + "SET_VAR can be used", }, qr: []*sqltypes.Result{sqltypes.MakeTestResult(sqltypes.MakeTestFields("orig|new", "varchar|varchar"), "a|", diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 98cfe503580..6ede24d539c 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1097,11 +1097,16 @@ func (e *Executor) getPlan( if e.VSchema() == nil { return nil, vterrors.VT13001("vschema not initialized") } - - qh, err := sqlparser.BuildQueryHints(stmt) + var setVarComment string + if e.vConfig.SetVarEnabled { + setVarComment = vcursor.PrepareSetVarComment() + } + plan, err := e.getCachedOrBuild(ctx, vcursor, query, stmt, reservedVars, bindVars, allowParameterization, comments, logStats, setVarComment) if err != nil { return nil, err } + + qh := plan.QueryHints vcursor.SetIgnoreMaxMemoryRows(qh.IgnoreMaxMemoryRows) vcursor.SetConsolidator(qh.Consolidator) vcursor.SetWorkloadName(qh.Workload) @@ -1109,10 +1114,46 @@ func (e *Executor) getPlan( vcursor.SetPriority(qh.Priority) vcursor.SetExecQueryTimeout(qh.Timeout) - setVarComment, err := prepareSetVarComment(vcursor, stmt) + if setVarComment != "" { + switch stmt.(type) { + // If the statement is a transaction statement or a `SET`, no reserved connection / SET_VAR is needed + case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint, + *sqlparser.SRollback, *sqlparser.Release, *sqlparser.Set, *sqlparser.Show, sqlparser.SupportOptimizerHint: + default: + vcursor.NeedsReservedConn() + } + } + + return plan, nil +} + +func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey { + hasher := vthash.New256() + vcursor.KeyForPlan(ctx, query, hasher) + + var planKey PlanCacheKey + hasher.Sum(planKey[:0]) + return planKey +} + +func (e *Executor) getCachedOrBuild( + ctx context.Context, + vcursor *econtext.VCursorImpl, + query string, + stmt sqlparser.Statement, + reservedVars *sqlparser.ReservedVars, + bindVars map[string]*querypb.BindVariable, + allowParameterization bool, + comments sqlparser.MarginComments, + logStats *logstats.LogStats, + setVarComment string, +) (*engine.Plan, error) { + + qh, err := sqlparser.BuildQueryHints(stmt) if err != nil { return nil, err } + vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks) rewriteASTResult, err := sqlparser.Normalize( stmt, @@ -1138,16 +1179,26 @@ func (e *Executor) getPlan( logStats.SQL = comments.Leading + query + comments.Trailing logStats.BindVariables = sqltypes.CopyBindVariables(bindVars) - return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats) -} + planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan() + if planCachable { + // build Plan key + pk := engine.PlanKey{ + CurrentKeyspace: vcursor.GetKeyspace(), + Query: query, + SetVarComment: setVarComment, + Collation: vcursor.ConnCollation(), + } -func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey { - hasher := vthash.New256() - vcursor.KeyForPlan(ctx, query, hasher) + planKey := pk.Hash() - var planKey PlanCacheKey - hasher.Sum(planKey[:0]) - return planKey + var plan *engine.Plan + var err error + plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey, e.epoch.Load(), func() (*engine.Plan, error) { + return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh) + }) + return plan, err + } + return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh) } func (e *Executor) buildStatement( @@ -1157,6 +1208,7 @@ func (e *Executor) buildStatement( stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, bindVarNeeds *sqlparser.BindVarNeeds, + qh sqlparser.QueryHints, ) (*engine.Plan, error) { plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, e.ddlConfig) if err != nil { @@ -1164,66 +1216,12 @@ func (e *Executor) buildStatement( } plan.Warnings = vcursor.GetAndEmptyWarnings() + plan.QueryHints = qh err = e.checkThatPlanIsValid(stmt, plan) return plan, err } -func (e *Executor) cacheAndBuildStatement( - ctx context.Context, - vcursor *econtext.VCursorImpl, - query string, - stmt sqlparser.Statement, - reservedVars *sqlparser.ReservedVars, - bindVarNeeds *sqlparser.BindVarNeeds, - logStats *logstats.LogStats, -) (*engine.Plan, error) { - planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan() - if planCachable { - planKey := e.hashPlan(ctx, vcursor, query) - - var plan *engine.Plan - var err error - plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey, e.epoch.Load(), func() (*engine.Plan, error) { - return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds) - }) - return plan, err - } - return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds) -} - -func (e *Executor) canNormalizeStatement(stmt sqlparser.Statement, setVarComment string) bool { - return sqlparser.CanNormalize(stmt) || setVarComment != "" -} - -func prepareSetVarComment(vcursor *econtext.VCursorImpl, stmt sqlparser.Statement) (string, error) { - if vcursor == nil || vcursor.Session().InReservedConn() { - return "", nil - } - - if !vcursor.Session().HasSystemVariables() { - return "", nil - } - - switch stmt.(type) { - // If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed - case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint, - *sqlparser.SRollback, *sqlparser.Release, *sqlparser.Set, *sqlparser.Show: - return "", nil - case sqlparser.SupportOptimizerHint: - break - default: - vcursor.NeedsReservedConn() - return "", nil - } - - var res strings.Builder - vcursor.Session().GetSystemVariables(func(k, v string) { - res.WriteString(fmt.Sprintf("SET_VAR(%s = %s) ", k, v)) - }) - return strings.TrimSpace(res.String()), nil -} - func (e *Executor) debugCacheEntries() (items map[string]*engine.Plan) { items = make(map[string]*engine.Plan) e.ForEachPlan(func(plan *engine.Plan) bool { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 3d8261495fe..b24cb2572a6 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -192,14 +192,14 @@ func TestSystemVariablesMySQLBelow80(t *testing.T) { wantQueries := []*querypb.BoundQuery{ {Sql: "select @@sql_mode orig, 'only_full_group_by' new"}, {Sql: "set sql_mode = 'only_full_group_by'", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, - {Sql: "select :vtg1 /* INT64 */ from information_schema.`table`", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, + {Sql: "select /*+ SET_VAR(sql_mode = 'only_full_group_by') */ :vtg1 /* INT64 */ from information_schema.`table`", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, } - + require.Equal(t, len(wantQueries), len(sbc1.Queries)) utils.MustMatch(t, wantQueries, sbc1.Queries) } func TestSystemVariablesWithSetVarDisabled(t *testing.T) { - executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.0") + executor, sbc1, _, _, _ := createExecutorEnv(t) executor.config.Normalize = true executor.vConfig.SetVarEnabled = false session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -217,10 +217,10 @@ func TestSystemVariablesWithSetVarDisabled(t *testing.T) { _, err := executor.Execute(context.Background(), nil, "TestSetStmt", session, "set @@sql_mode = only_full_group_by", map[string]*querypb.BindVariable{}) require.NoError(t, err) + require.True(t, session.InReservedConn()) _, err = executor.Execute(context.Background(), nil, "TestSelect", session, "select 1 from information_schema.table", map[string]*querypb.BindVariable{}) require.NoError(t, err) - require.True(t, session.InReservedConn()) wantQueries := []*querypb.BoundQuery{ {Sql: "select @@sql_mode orig, 'only_full_group_by' new"}, @@ -381,12 +381,13 @@ func TestSetSystemVariables(t *testing.T) { wantQueries = []*querypb.BoundQuery{ {Sql: "select 1 from dual where @@max_tmp_tables != 1"}, {Sql: "set max_tmp_tables = '1', sql_mode = 'only_full_group_by', sql_safe_updates = '0'", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, - {Sql: "select :vtg1 /* INT64 */ from information_schema.`table`", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, + // we don't need the set_var since we are in a reserved connection, but since the plan is in the cache, we'll use it + {Sql: "select /*+ SET_VAR(sql_mode = 'only_full_group_by') SET_VAR(sql_safe_updates = '0') */ :vtg1 /* INT64 */ from information_schema.`table`", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, } utils.MustMatch(t, wantQueries, lookup.Queries) } -func TestSetSystemVariablesWithReservedConnection(t *testing.T) { +func TestSetSystemVariablesWithSetVarInvalidSQLMode(t *testing.T) { executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, SystemVariables: map[string]string{}}) @@ -401,31 +402,28 @@ func TestSetSystemVariablesWithReservedConnection(t *testing.T) { sqltypes.NewVarChar(""), }}, }}) + _, err := executor.Execute(context.Background(), nil, "TestSetStmt", session, "set @@sql_mode = ''", map[string]*querypb.BindVariable{}) require.NoError(t, err) + require.False(t, session.InReservedConn()) _, err = executor.Execute(context.Background(), nil, "TestSelect", session, "select age, city from user group by age", map[string]*querypb.BindVariable{}) require.NoError(t, err) - require.True(t, session.InReservedConn()) wantQueries := []*querypb.BoundQuery{ {Sql: "select @@sql_mode orig, '' new"}, - {Sql: "set sql_mode = ''"}, - {Sql: "select age, city, weight_string(age) from `user` group by age, weight_string(age) order by age asc"}, + {Sql: "select /*+ SET_VAR(sql_mode = ' ') */ age, city, weight_string(age) from `user` group by age, weight_string(age) order by age asc"}, } utils.MustMatch(t, wantQueries, sbc1.Queries) _, err = executor.Execute(context.Background(), nil, "TestSelect", session, "select age, city+1 from user group by age", map[string]*querypb.BindVariable{}) require.NoError(t, err) - require.True(t, session.InReservedConn()) wantQueries = []*querypb.BoundQuery{ {Sql: "select @@sql_mode orig, '' new"}, - {Sql: "set sql_mode = ''"}, - {Sql: "select age, city, weight_string(age) from `user` group by age, weight_string(age) order by age asc"}, - {Sql: "select age, city + :vtg1 /* INT64 */, weight_string(age) from `user` group by age, weight_string(age) order by age asc", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, + {Sql: "select /*+ SET_VAR(sql_mode = ' ') */ age, city, weight_string(age) from `user` group by age, weight_string(age) order by age asc"}, + {Sql: "select /*+ SET_VAR(sql_mode = ' ') */ age, city + :vtg1 /* INT64 */, weight_string(age) from `user` group by age, weight_string(age) order by age asc", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, } utils.MustMatch(t, wantQueries, sbc1.Queries) - require.Equal(t, "''", session.SystemVariables["sql_mode"]) - sbc1.Queries = nil + require.Equal(t, "' '", session.SystemVariables["sql_mode"]) } func TestSelectVindexFunc(t *testing.T) { @@ -449,7 +447,7 @@ func TestCreateTableValidTimestamp(t *testing.T) { query := "create table aa(t timestamp default 0)" _, err := executor.Execute(context.Background(), nil, "TestSelect", session, query, map[string]*querypb.BindVariable{}) require.NoError(t, err) - require.True(t, session.InReservedConn()) + assert.True(t, session.InReservedConn()) wantQueries := []*querypb.BoundQuery{ {Sql: "set sql_mode = ALLOW_INVALID_DATES", BindVariables: map[string]*querypb.BindVariable{}}, diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index 19b8f3cbe3a..a4a225608e6 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -29,6 +29,8 @@ import ( "github.com/google/uuid" "golang.org/x/exp/maps" + "vitess.io/vitess/go/vt/sysvars" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/config" "vitess.io/vitess/go/mysql/sqlerror" @@ -213,6 +215,17 @@ func NewVCursorImpl( }, nil } +func (vc *VCursorImpl) PrepareSetVarComment() string { + var res []string + vc.Session().GetSystemVariables(func(k, v string) { + if sysvars.SupportsSetVar(k) { + res = append(res, fmt.Sprintf("SET_VAR(%s = %s) ", k, v)) + } + }) + + return strings.Join(res, " ") +} + func (vc *VCursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { callerId := callerid.EffectiveCallerIDFromContext(ctx) immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) diff --git a/go/vt/vthash/highway/highwayhash.go b/go/vt/vthash/highway/highwayhash.go index a922b435d9d..20b9ca4d3a8 100644 --- a/go/vt/vthash/highway/highwayhash.go +++ b/go/vt/vthash/highway/highwayhash.go @@ -142,6 +142,12 @@ func (d *Digest) Write(p []byte) (n int, err error) { return } +func (d *Digest) WriteUint16(x uint16) (int, error) { + var b [2]byte + binary.LittleEndian.PutUint16(b[:], x) + return d.Write(b[:]) +} + func (d *Digest) Sum(b []byte) []byte { state := d.state if d.offset > 0 {