From 5f1bdeaf9b0bfcbe9b6e16cd79087c04e9da558a Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 17 Feb 2025 22:00:13 +0530 Subject: [PATCH] add destination to plan hash key and make all test green Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/plan.go | 5 ++- go/vt/vtgate/executor.go | 57 ++++++++++++++++++---------- go/vt/vtgate/executor_select_test.go | 20 +++++++++- go/vt/vtgate/executor_test.go | 15 +++----- 4 files changed, 64 insertions(+), 33 deletions(-) diff --git a/go/vt/vtgate/engine/plan.go b/go/vt/vtgate/engine/plan.go index 2df8facb314..18aee6fff6d 100644 --- a/go/vt/vtgate/engine/plan.go +++ b/go/vt/vtgate/engine/plan.go @@ -25,10 +25,9 @@ import ( "vitess.io/vitess/go/cache/theine" "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/vt/vthash" - "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vthash" ) // Plan represents the execution strategy for a given query. @@ -55,6 +54,7 @@ type Plan struct { type PlanKey struct { CurrentKeyspace string + Destination string Query string SetVarComment string Collation collations.ID @@ -68,6 +68,7 @@ func (pk PlanKey) Hash() theine.HashKey256 { hasher := vthash.New256() _, _ = hasher.WriteUint16(uint16(pk.Collation)) _, _ = hasher.WriteString(pk.CurrentKeyspace) + _, _ = hasher.WriteString(pk.Destination) _, _ = hasher.WriteString(pk.SetVarComment) _, _ = hasher.WriteString(pk.Query) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 6ede24d539c..0b96c01324f 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net/http" + "sort" "strings" "sync" "sync/atomic" @@ -30,9 +31,6 @@ import ( "github.com/spf13/pflag" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - "vitess.io/vitess/go/vt/vtgate/dynamicconfig" - "vitess.io/vitess/go/acl" "vitess.io/vitess/go/cache/theine" "vitess.io/vitess/go/mysql/capabilities" @@ -49,6 +47,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" @@ -58,6 +57,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/dynamicconfig" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" @@ -67,7 +67,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" - "vitess.io/vitess/go/vt/vthash" ) var ( @@ -1127,15 +1126,6 @@ func (e *Executor) getPlan( return plan, nil } -func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey { - hasher := vthash.New256() - vcursor.KeyForPlan(ctx, query, hasher) - - var planKey PlanCacheKey - hasher.Sum(planKey[:0]) - return planKey -} - func (e *Executor) getCachedOrBuild( ctx context.Context, vcursor *econtext.VCursorImpl, @@ -1182,14 +1172,7 @@ func (e *Executor) getCachedOrBuild( planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan() if planCachable { // build Plan key - pk := engine.PlanKey{ - CurrentKeyspace: vcursor.GetKeyspace(), - Query: query, - SetVarComment: setVarComment, - Collation: vcursor.ConnCollation(), - } - - planKey := pk.Hash() + planKey := e.hashPlan(ctx, vcursor, query, setVarComment) var plan *engine.Plan var err error @@ -1201,6 +1184,38 @@ func (e *Executor) getCachedOrBuild( return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh) } +func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string, setVarComment string) theine.HashKey256 { + var allDest []string + currDest := vcursor.Destination() + if currDest != nil { + switch currDest.(type) { + case key.DestinationKeyspaceID, key.DestinationKeyspaceIDs: + resolved, _, err := vcursor.ResolveDestinations(ctx, vcursor.GetKeyspace(), nil, []key.Destination{currDest}) + if err == nil && len(resolved) > 0 { + shards := make([]string, len(resolved)) + for i := 0; i < len(shards); i++ { + shards[i] = resolved[i].Target.GetShard() + } + sort.Strings(shards) + allDest = shards + } + default: + allDest = []string{currDest.String()} + } + } + + pk := engine.PlanKey{ + CurrentKeyspace: vcursor.GetKeyspace(), + Destination: strings.Join(allDest, ","), + Query: query, + SetVarComment: setVarComment, + Collation: vcursor.ConnCollation(), + } + + planKey := pk.Hash() + return planKey +} + func (e *Executor) buildStatement( ctx context.Context, vcursor *econtext.VCursorImpl, diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index b24cb2572a6..4c178ff9110 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "reflect" "runtime" "strconv" "strings" @@ -29,6 +30,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" @@ -384,7 +386,23 @@ func TestSetSystemVariables(t *testing.T) { // we don't need the set_var since we are in a reserved connection, but since the plan is in the cache, we'll use it {Sql: "select /*+ SET_VAR(sql_mode = 'only_full_group_by') SET_VAR(sql_safe_updates = '0') */ :vtg1 /* INT64 */ from information_schema.`table`", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}}, } - utils.MustMatch(t, wantQueries, lookup.Queries) + + diffOpts := []cmp.Option{ + cmp.Comparer(func(a, b proto.Message) bool { + return proto.Equal(a, b) + }), + cmp.Exporter(func(reflect.Type) bool { + return true + }), + } + diff := cmp.Diff(wantQueries, lookup.Queries, diffOpts...) + if diff == "" { + return + } + // try again with rearranged SET_VAR hints + wantQueries[2].Sql = "select /*+ SET_VAR(sql_safe_updates = '0') SET_VAR(sql_mode = 'only_full_group_by') */ :vtg1 /* INT64 */ from information_schema.`table`" + diff = cmp.Diff(wantQueries, lookup.Queries, diffOpts...) + assert.Empty(t, diff) } func TestSetSystemVariablesWithSetVarInvalidSQLMode(t *testing.T) { diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index a3afa932db2..acb58bacc71 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1617,7 +1617,7 @@ func assertCacheContains(t *testing.T, e *Executor, vc *econtext.VCursorImpl, sq return true }) } else { - h := e.hashPlan(context.Background(), vc, sql) + h := e.hashPlan(context.Background(), vc, sql, "") plan, _ = e.plans.Get(h, e.epoch.Load()) } require.Truef(t, plan != nil, "plan not found for query: %s", sql) @@ -1652,29 +1652,26 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { assertCacheSize(t, r.plans, 0) wantSQL := query1 + " /* comment */" - if logStats1.SQL != wantSQL { - t.Errorf("logstats sql want \"%s\" got \"%s\"", wantSQL, logStats1.SQL) - } + assert.Equal(t, wantSQL, logStats1.SQL) _, logStats2 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment 2 */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 1) wantSQL = query1 + " /* comment 2 */" - if logStats2.SQL != wantSQL { - t.Errorf("logstats sql want \"%s\" got \"%s\"", wantSQL, logStats2.SQL) - } + assert.Equal(t, wantSQL, logStats2.SQL) }) t.Run("Skip Cache", func(t *testing.T) { - // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) + // Skip cache using directive query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 0) + // it will be cached now. query1 = "insert into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 1) @@ -1738,7 +1735,7 @@ func TestGetPlanNormalized(t *testing.T) { emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) - query1 := "select * from music_user_map where id = 1" + query1 := "select * from music_user_map where id = 1" // 163 -- 80 query2 := "select * from music_user_map where id = 2" normalized := "select * from music_user_map where id = :id /* INT64 */"