Skip to content

Commit

Permalink
vtgateproxy: sticky random balancer and remote zone backup connection…
Browse files Browse the repository at this point in the history
…s v19 (#598)

* vtgateproxy: sticky random balancer and remote zone backup connections (#593)

* gist of the idea of a sticky_random balancer

* make sticky_random actually work

* include the zone locality in the address attributes

Make it available to the picker layer along with the pool type. Also rework how
the locality is computed to do it as part of building the target host rather
than deferring the comparisons to the sorter.

* include IsLocal in debug output

* add support for local zone affinity to sticky_random

Using the zone local attribute inhjected by discovery (if it exists), update
sticky_random so that it biases to only use the local zone connections if there
are any available, otherwise fall back to remote.

* add num_backup_conns option to force discovery of non-zone-local vtgates

* sticky_random failure test

* and the rest

* sticky_random rebalance

---------

Co-authored-by: Michael Demmer <mdemmer@slack-corp.com>
  • Loading branch information
dedelala and demmer authored Feb 6, 2025
1 parent 94c7f03 commit aa0835a
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 32 deletions.
10 changes: 9 additions & 1 deletion go/test/endtoend/vtgateproxy/failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ func TestVtgateProxyVtgateFailureRoundRobin(t *testing.T) {
}

func TestVtgateProxyVtgateFailureFirstReady(t *testing.T) {
testVtgateProxyVtgateFailure(t, "first_ready")
}

func TestVtgateProxyVtgateFailureStickyRandom(t *testing.T) {
testVtgateProxyVtgateFailure(t, "sticky_random")
}

func testVtgateProxyVtgateFailure(t *testing.T, balancer string) {
defer cluster.PanicHandler(t)

const targetAffinity = "use1-az1"
Expand Down Expand Up @@ -155,7 +163,7 @@ func TestVtgateProxyVtgateFailureFirstReady(t *testing.T) {
clusterInstance.TmpDirectory,
vtgateHostsFile,
targetAffinity,
"first_ready",
balancer,
vtgateproxyConnections,
vtgateproxyHTTPPort,
vtgateproxyGrpcPort,
Expand Down
33 changes: 18 additions & 15 deletions go/test/endtoend/vtgateproxy/rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func TestVtgateProxyRebalanceFirstReady(t *testing.T) {
testVtgateProxyRebalance(t, "first_ready")
}

func TestVtgateProxyRebalanceStickyRandom(t *testing.T) {
testVtgateProxyRebalance(t, "sticky_random")
}

func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
defer cluster.PanicHandler(t)

Expand Down Expand Up @@ -136,20 +140,6 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}

var expectVtgatesWithQueries int

switch loadBalancer {
case "round_robin":
// Every vtgate should get some queries. We went from 1 vtgates to
// NumConnections+1 vtgates, and then removed the first vtgate.
expectVtgatesWithQueries = len(vtgates)
case "first_ready":
// Only 2 vtgates should have queries. The first vtgate should get all
// queries until it is removed, and then a new vtgate should be picked
// to get all subsequent queries.
expectVtgatesWithQueries = 2
}

var vtgatesWithQueries int
for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
Expand All @@ -164,5 +154,18 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
}
}

assert.Equal(t, expectVtgatesWithQueries, vtgatesWithQueries)
switch loadBalancer {
case "round_robin":
// Every vtgate should get some queries. We went from 1 vtgates to
// NumConnections+1 vtgates, and then removed the first vtgate.
assert.Equal(t, len(vtgates), vtgatesWithQueries)
case "first_ready":
// Only 2 vtgates should have queries. The first vtgate should get all
// queries until it is removed, and then a new vtgate should be picked
// to get all subsequent queries.
assert.Equal(t, 2, vtgatesWithQueries)
case "sticky_random":
// 3 or 4 vtgates should get some queries.
assert.Contains(t, []int{3, 4}, vtgatesWithQueries)
}
}
4 changes: 4 additions & 0 deletions go/test/endtoend/vtgateproxy/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func TestVtgateProxyScaleFirstReady(t *testing.T) {
testVtgateProxyScale(t, "first_ready")
}

func TestVtgateProxyScaleStickyRandom(t *testing.T) {
testVtgateProxyScale(t, "sticky_random")
}

func testVtgateProxyScale(t *testing.T, loadBalancer string) {
defer cluster.PanicHandler(t)

Expand Down
4 changes: 4 additions & 0 deletions go/test/endtoend/vtgateproxy/vtgateproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func TestVtgateProxyProcessFirstReady(t *testing.T) {
testVtgateProxyProcess(t, "first_ready")
}

func TestVtgateProxyProcessStickyRandom(t *testing.T) {
testVtgateProxyProcess(t, "sticky_random")
}

func testVtgateProxyProcess(t *testing.T, loadBalancer string) {
defer cluster.PanicHandler(t)

Expand Down
44 changes: 30 additions & 14 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
//

const PoolTypeAttr = "PoolType"
const ZoneLocalAttr = "ZoneLocal"

// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type JSONGateResolver struct {
Expand All @@ -83,6 +84,7 @@ type JSONGateResolverBuilder struct {
affinityField string
affinityValue string
numConnections int
numBackupConns int

mu sync.RWMutex
targets map[string][]targetHost
Expand All @@ -98,6 +100,7 @@ type targetHost struct {
Addr string
PoolType string
Affinity string
IsLocal bool
}

var (
Expand All @@ -113,6 +116,7 @@ func RegisterJSONGateResolver(
affinityField string,
affinityValue string,
numConnections int,
numBackupConns int,
) (*JSONGateResolverBuilder, error) {
jsonDiscovery := &JSONGateResolverBuilder{
targets: map[string][]targetHost{},
Expand All @@ -123,6 +127,7 @@ func RegisterJSONGateResolver(
affinityField: affinityField,
affinityValue: affinityValue,
numConnections: numConnections,
numBackupConns: numBackupConns,
sorter: newShuffleSorter(),
}

Expand Down Expand Up @@ -263,7 +268,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err)
}

var targets = map[string][]targetHost{}
var allTargets = map[string][]targetHost{}
for _, host := range hosts {
hostname, hasHostname := host["host"]
address, hasAddress := host[b.addressField]
Expand Down Expand Up @@ -309,8 +314,8 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port)
}

target := targetHost{hostname.(string), fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}
targets[target.PoolType] = append(targets[target.PoolType], target)
target := targetHost{hostname.(string), fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string), affinity == b.affinityValue}
allTargets[target.PoolType] = append(allTargets[target.PoolType], target)
}

// If a pool disappears, the metric will not record this unless all counts
Expand All @@ -320,16 +325,25 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
// targets and only resetting pools which disappear.
targetCount.ResetAll()

for poolType := range targets {
b.sorter.shuffleSort(targets[poolType], b.affinityField, b.affinityValue)
if len(targets[poolType]) > *numConnections {
targets[poolType] = targets[poolType][:b.numConnections]
var selected = map[string][]targetHost{}

for poolType := range allTargets {
b.sorter.shuffleSort(allTargets[poolType])

// try to pick numConnections from the front of the list (local zone) and numBackupConnections
// from the tail (remote zone). if that's not possible, just take the whole set
if len(allTargets[poolType]) >= b.numConnections+b.numBackupConns {
remoteOffset := len(allTargets[poolType]) - b.numBackupConns
selected[poolType] = append(allTargets[poolType][:b.numConnections], allTargets[poolType][remoteOffset:]...)
} else {
selected[poolType] = allTargets[poolType]
}
targetCount.Set(poolType, int64(len(targets[poolType])))

targetCount.Set(poolType, int64(len(selected[poolType])))
}

b.mu.Lock()
b.targets = targets
b.targets = selected
b.mu.Unlock()

return true, nil
Expand All @@ -353,7 +367,7 @@ func (b *JSONGateResolverBuilder) getTargets(poolType string) []targetHost {
targets = append(targets, b.targets[poolType]...)
b.mu.RUnlock()

b.sorter.shuffleSort(targets, b.affinityField, b.affinityValue)
b.sorter.shuffleSort(targets)

return targets
}
Expand All @@ -373,7 +387,7 @@ func newShuffleSorter() *shuffleSorter {
// shuffleSort shuffles a slice of targetHost to ensure every host has a
// different order to iterate through, putting the affinity matching (e.g. same
// az) hosts at the front and the non-matching ones at the end.
func (s *shuffleSorter) shuffleSort(targets []targetHost, affinityField, affinityValue string) {
func (s *shuffleSorter) shuffleSort(targets []targetHost) {
n := len(targets)
head := 0
// Only need to do n-1 swaps since the last host is always in the right place.
Expand All @@ -383,7 +397,7 @@ func (s *shuffleSorter) shuffleSort(targets []targetHost, affinityField, affinit
j := head + s.rand.Intn(tail-head+1)
s.mu.Unlock()

if affinityField != "" && affinityValue == targets[j].Affinity {
if targets[j].IsLocal {
targets[head], targets[j] = targets[j], targets[head]
head++
} else {
Expand All @@ -406,7 +420,8 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {

var addrs []resolver.Address
for _, target := range targets {
addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)})
attrs := attributes.New(PoolTypeAttr, r.poolType).WithValue(ZoneLocalAttr, target.IsLocal)
addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attrs})
}

// If we've already selected some targets, give the new addresses some time to warm up before removing
Expand Down Expand Up @@ -488,12 +503,13 @@ const (
</style>
<table>
{{range $i, $p := .Pools}} <tr>
<th colspan="3">{{$p}}</th>
<th colspan="4">{{$p}}</th>
</tr>
{{range index $.Targets $p}} <tr>
<td>{{.Hostname}}</td>
<td>{{.Addr}}</td>
<td>{{.Affinity}}</td>
<td>{{.IsLocal}}</td>
</tr>{{end}}
{{end}}
</table>
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgateproxy/firstready_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ import (
)

// newBuilder creates a new first_ready balancer builder.
func newBuilder() balancer.Builder {
func newFirstReadyBuilder() balancer.Builder {
return base.NewBalancerBuilder("first_ready", &frPickerBuilder{currentConns: map[string]balancer.SubConn{}}, base.Config{HealthCheck: true})
}

func init() {
balancer.Register(newBuilder())
log.V(1).Infof("registering first_ready balancer")
balancer.Register(newFirstReadyBuilder())
}

// frPickerBuilder implements both the Builder and the Picker interfaces.
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
}
}()

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
return sqlerror.NewSQLErrorFromError(err)
Expand Down Expand Up @@ -285,6 +287,8 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
}
}(session)

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

_, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars)
err = sqlerror.NewSQLErrorFromError(err)
if err != nil {
Expand Down Expand Up @@ -332,6 +336,8 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData
}
}()

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
return sqlerror.NewSQLErrorFromError(err)
Expand Down Expand Up @@ -396,6 +402,8 @@ func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgatec
options.ClientFoundRows = true
}

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

var err error
session, err = ph.proxy.NewSession(ctx, options, c.Attributes)
if err != nil {
Expand All @@ -420,6 +428,9 @@ func (ph *proxyHandler) closeSession(ctx context.Context, c *mysql.Conn) {
if session.SessionPb().InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

err := ph.proxy.CloseSession(ctx, session)
if err != nil {
log.Errorf("Error happened in transaction rollback: %v", err)
Expand Down
Loading

0 comments on commit aa0835a

Please sign in to comment.