Skip to content

Commit

Permalink
add destination to plan hash key and make all test green
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Feb 17, 2025
1 parent 6f637c8 commit 5f1bdea
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 33 deletions.
5 changes: 3 additions & 2 deletions go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (

"vitess.io/vitess/go/cache/theine"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vthash"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vthash"
)

// Plan represents the execution strategy for a given query.
Expand All @@ -55,6 +54,7 @@ type Plan struct {

type PlanKey struct {
CurrentKeyspace string
Destination string
Query string
SetVarComment string
Collation collations.ID
Expand All @@ -68,6 +68,7 @@ func (pk PlanKey) Hash() theine.HashKey256 {
hasher := vthash.New256()
_, _ = hasher.WriteUint16(uint16(pk.Collation))
_, _ = hasher.WriteString(pk.CurrentKeyspace)
_, _ = hasher.WriteString(pk.Destination)
_, _ = hasher.WriteString(pk.SetVarComment)
_, _ = hasher.WriteString(pk.Query)

Expand Down
57 changes: 36 additions & 21 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/spf13/pflag"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/vtgate/dynamicconfig"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cache/theine"
"vitess.io/vitess/go/mysql/capabilities"
Expand All @@ -49,6 +47,7 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
Expand All @@ -58,6 +57,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/dynamicconfig"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/evalengine"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
Expand All @@ -67,7 +67,6 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"
"vitess.io/vitess/go/vt/vthash"
)

var (
Expand Down Expand Up @@ -1127,15 +1126,6 @@ func (e *Executor) getPlan(
return plan, nil
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
hasher := vthash.New256()
vcursor.KeyForPlan(ctx, query, hasher)

var planKey PlanCacheKey
hasher.Sum(planKey[:0])
return planKey
}

func (e *Executor) getCachedOrBuild(
ctx context.Context,
vcursor *econtext.VCursorImpl,
Expand Down Expand Up @@ -1182,14 +1172,7 @@ func (e *Executor) getCachedOrBuild(
planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
if planCachable {
// build Plan key
pk := engine.PlanKey{
CurrentKeyspace: vcursor.GetKeyspace(),
Query: query,
SetVarComment: setVarComment,
Collation: vcursor.ConnCollation(),
}

planKey := pk.Hash()
planKey := e.hashPlan(ctx, vcursor, query, setVarComment)

var plan *engine.Plan
var err error
Expand All @@ -1201,6 +1184,38 @@ func (e *Executor) getCachedOrBuild(
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh)
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string, setVarComment string) theine.HashKey256 {
var allDest []string
currDest := vcursor.Destination()
if currDest != nil {
switch currDest.(type) {
case key.DestinationKeyspaceID, key.DestinationKeyspaceIDs:
resolved, _, err := vcursor.ResolveDestinations(ctx, vcursor.GetKeyspace(), nil, []key.Destination{currDest})
if err == nil && len(resolved) > 0 {
shards := make([]string, len(resolved))
for i := 0; i < len(shards); i++ {
shards[i] = resolved[i].Target.GetShard()
}
sort.Strings(shards)
allDest = shards
}
default:
allDest = []string{currDest.String()}
}
}

pk := engine.PlanKey{
CurrentKeyspace: vcursor.GetKeyspace(),
Destination: strings.Join(allDest, ","),
Query: query,
SetVarComment: setVarComment,
Collation: vcursor.ConnCollation(),
}

planKey := pk.Hash()
return planKey
}

func (e *Executor) buildStatement(
ctx context.Context,
vcursor *econtext.VCursorImpl,
Expand Down
20 changes: 19 additions & 1 deletion go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"runtime"
"strconv"
"strings"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"

Expand Down Expand Up @@ -384,7 +386,23 @@ func TestSetSystemVariables(t *testing.T) {
// we don't need the set_var since we are in a reserved connection, but since the plan is in the cache, we'll use it
{Sql: "select /*+ SET_VAR(sql_mode = 'only_full_group_by') SET_VAR(sql_safe_updates = '0') */ :vtg1 /* INT64 */ from information_schema.`table`", BindVariables: map[string]*querypb.BindVariable{"vtg1": {Type: sqltypes.Int64, Value: []byte("1")}}},
}
utils.MustMatch(t, wantQueries, lookup.Queries)

diffOpts := []cmp.Option{
cmp.Comparer(func(a, b proto.Message) bool {
return proto.Equal(a, b)
}),
cmp.Exporter(func(reflect.Type) bool {
return true
}),
}
diff := cmp.Diff(wantQueries, lookup.Queries, diffOpts...)
if diff == "" {
return
}
// try again with rearranged SET_VAR hints
wantQueries[2].Sql = "select /*+ SET_VAR(sql_safe_updates = '0') SET_VAR(sql_mode = 'only_full_group_by') */ :vtg1 /* INT64 */ from information_schema.`table`"
diff = cmp.Diff(wantQueries, lookup.Queries, diffOpts...)
assert.Empty(t, diff)
}

func TestSetSystemVariablesWithSetVarInvalidSQLMode(t *testing.T) {
Expand Down
15 changes: 6 additions & 9 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ func assertCacheContains(t *testing.T, e *Executor, vc *econtext.VCursorImpl, sq
return true
})
} else {
h := e.hashPlan(context.Background(), vc, sql)
h := e.hashPlan(context.Background(), vc, sql, "")
plan, _ = e.plans.Get(h, e.epoch.Load())
}
require.Truef(t, plan != nil, "plan not found for query: %s", sql)
Expand Down Expand Up @@ -1652,29 +1652,26 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
assertCacheSize(t, r.plans, 0)

wantSQL := query1 + " /* comment */"
if logStats1.SQL != wantSQL {
t.Errorf("logstats sql want \"%s\" got \"%s\"", wantSQL, logStats1.SQL)
}
assert.Equal(t, wantSQL, logStats1.SQL)

_, logStats2 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment 2 */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 1)

wantSQL = query1 + " /* comment 2 */"
if logStats2.SQL != wantSQL {
t.Errorf("logstats sql want \"%s\" got \"%s\"", wantSQL, logStats2.SQL)
}
assert.Equal(t, wantSQL, logStats2.SQL)
})

t.Run("Skip Cache", func(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)

unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig)

// Skip cache using directive
query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 0)

// it will be cached now.
query1 = "insert into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 1)
Expand Down Expand Up @@ -1738,7 +1735,7 @@ func TestGetPlanNormalized(t *testing.T) {
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{})

query1 := "select * from music_user_map where id = 1"
query1 := "select * from music_user_map where id = 1" // 163 -- 80
query2 := "select * from music_user_map where id = 2"
normalized := "select * from music_user_map where id = :id /* INT64 */"

Expand Down

0 comments on commit 5f1bdea

Please sign in to comment.