Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Stable client indexes for target selection #588

Open
wants to merge 2 commits into
base: vtgateproxy-15
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions go/vt/vtgateproxy/client_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package vtgateproxy

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"sort"
"sync"
"time"

"vitess.io/vitess/go/vt/log"
)

var (
clientDiscoveryInterval = flag.Int("client_discovery_interval_seconds", 30, "how often to update the client discovery")
clientDiscoveryEndpoint = flag.String("client_discovery_endpoint", "http://rotor-http-dev-us-east-1.internal.ec2.tinyspeck.com:50001/v3/discovery:endpoints", "rotor endpoint to query client list")
clientDiscoveryResourceType = flag.String("client_discovery_resource_type", "hhvm-metrics@dev-us-east-1", "client resource type")
clientIndex = 0
clientIndexLock = &sync.Mutex{}
)

func getWebappDiscovery() (*RotorJson, error) {
payload := fmt.Sprintf(`{"node":{"cluster":"webapp"}, "resource_names":["%s"]}`, *clientDiscoveryResourceType)
res, err := http.Post(*clientDiscoveryEndpoint, "application/json", bytes.NewBuffer([]byte(payload)))
if err != nil {
return nil, err
}

if res.StatusCode != 200 {
return nil, fmt.Errorf("Non-200 response code from rotor: %d", res.StatusCode)
}
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
result := RotorJson{}
json.Unmarshal(body, &result)
return &result, nil
}

func getHosts() ([]string, error) {
discovery, err := getWebappDiscovery()
if err != nil {
return nil, err
}
nodenames := []string{}
for _, r := range discovery.Resources {
for _, e := range r.Endpoints {
for _, lb := range e.LBEndpoints {
node := lb.Metadata.FilterMetadata.EnvoyLB
nodenames = append(nodenames, node.NodeName)
fmt.Printf("Nodename: %+v\n", node.NodeName)
}
}
}

return nodenames, nil
}

func setClientIndex() error {
nodenames, err := getHosts()
if err != nil {
return err
}
sort.Strings(nodenames)
hostname, err := os.Hostname()
if err != nil {
return err
}

for i, n := range nodenames {
if n == hostname {
clientIndexLock.Lock()
defer clientIndexLock.Unlock()
clientIndex = i
log.Infof("Client index set to %d", i)
return nil
}
}

return fmt.Errorf("hostname '%s' not found (client list length %d)", hostname, len(nodenames))
}

func clientDiscovery() {
ticker := time.NewTicker(time.Duration(*clientDiscoveryInterval) * time.Second)

for range ticker.C {
if err := setClientIndex(); err != nil {
log.Errorf("Failed to set client index: %s", err)
}
}
}

func startClientDiscovery() error {
if err := setClientIndex(); err != nil {
return err
}

go clientDiscovery()

return nil
}

func getClientIndex() int {
clientIndexLock.Lock()
defer clientIndexLock.Unlock()
return clientIndex
}
55 changes: 49 additions & 6 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import (
"bytes"
"cmp"

Check failure on line 20 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package cmp is not in GOROOT (/usr/local/go/src/cmp)

Check failure on line 20 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package cmp is not in GOROOT (/usr/local/go/src/cmp)

Check failure on line 20 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package cmp is not in GOROOT (/usr/local/go/src/cmp)

Check failure on line 20 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package cmp is not in GOROOT (/usr/local/go/src/cmp)

Check failure on line 20 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package cmp is not in GOROOT (/usr/local/go/src/cmp)

Check failure on line 20 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package cmp is not in GOROOT (/usr/local/go/src/cmp)
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"slices"

Check failure on line 27 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package slices is not in GOROOT (/usr/local/go/src/slices)

Check failure on line 27 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package slices is not in GOROOT (/usr/local/go/src/slices)

Check failure on line 27 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package slices is not in GOROOT (/usr/local/go/src/slices)

Check failure on line 27 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

package slices is not in GOROOT (/usr/local/go/src/slices)
"sort"
"sync"
"time"
Expand Down Expand Up @@ -186,6 +188,7 @@

go func() {
var parseErr error
lastClientIndex := -1
for range b.ticker.C {
checkFileStat, err := os.Stat(b.jsonPath)
if err != nil {
Expand All @@ -211,10 +214,13 @@
continue
}
parseErr = nil
if !contentsChanged {
curClientIndex := getClientIndex()
clientsChanged := (lastClientIndex == curClientIndex)
if !contentsChanged && !clientsChanged {
parseCount.Add("unchanged", 1)
continue
}
lastClientIndex = curClientIndex
parseCount.Add("changed", 1)

var wg sync.WaitGroup
Expand Down Expand Up @@ -326,17 +332,27 @@
targetCount.ResetAll()

var selected = map[string][]targetHost{}

idx := getClientIndex()
for poolType := range allTargets {
b.sorter.shuffleSort(allTargets[poolType])
poolTargets := allTargets[poolType]
if idx == -1 {
// If we can't find a client index, fall back to randomizing the list and picking the first N local connections
b.sorter.shuffleSort(poolTargets)
idx = 0
} else {
stableSortTargets(poolTargets)
}

// try to pick numConnections from the front of the list (local zone) and numBackupConnections
// from the tail (remote zone). if that's not possible, just take the whole set
if len(allTargets[poolType]) >= b.numConnections+b.numBackupConns {
if len(poolTargets) >= b.numConnections+b.numBackupConns {
remoteOffset := len(allTargets[poolType]) - b.numBackupConns
selected[poolType] = append(allTargets[poolType][:b.numConnections], allTargets[poolType][remoteOffset:]...)
for i := 0; i < b.numConnections; i++ {
selected[poolType] = append(selected[poolType], poolTargets[(idx+i)%remoteOffset])
}
selected[poolType] = append(selected[poolType], poolTargets[remoteOffset:]...)
} else {
selected[poolType] = allTargets[poolType]
selected[poolType] = poolTargets
}

targetCount.Set(poolType, int64(len(selected[poolType])))
Expand Down Expand Up @@ -407,6 +423,33 @@
}
}

func stableSortTargets(targets []targetHost) {
head, tail := 0, len(targets)

for i, t := range targets {
if t.IsLocal {
targets[head], targets[i] = targets[i], targets[head]
head += 1
} else {
targets[tail-1], targets[i] = targets[i], targets[tail-1]
tail -= 1
}
}

slices.SortStableFunc(targets[:head], func(t1 targetHost, t2 targetHost) int {
return cmp.Compare(t1.Hostname, t2.Hostname)
})

slices.SortStableFunc(targets[head:], func(t1 targetHost, t2 targetHost) int {
return cmp.Compare(t1.Hostname, t2.Hostname)
})

}

func DoTest() int {
return getClientIndex()
}

// Update the current list of hosts for the given resolver
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {
log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)
Expand Down
Loading
Loading