From 43f05747ab6166044344de9af4c3ff64a905f530 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Tue, 21 Jan 2025 13:34:06 +0000 Subject: [PATCH 1/2] WIP: use stable client indexes to pick target vtgates Signed-off-by: Henry Robinson --- go/vt/vtgateproxy/client_index.go | 112 ++++++++++++++++++ go/vt/vtgateproxy/discovery.go | 49 +++++++- go/vt/vtgateproxy/rotor_json.go | 185 ++++++++++++++++++++++++++++++ go/vt/vtgateproxy/vtgateproxy.go | 4 + 4 files changed, 345 insertions(+), 5 deletions(-) create mode 100644 go/vt/vtgateproxy/client_index.go create mode 100644 go/vt/vtgateproxy/rotor_json.go diff --git a/go/vt/vtgateproxy/client_index.go b/go/vt/vtgateproxy/client_index.go new file mode 100644 index 00000000000..2d0c988e564 --- /dev/null +++ b/go/vt/vtgateproxy/client_index.go @@ -0,0 +1,112 @@ +package vtgateproxy + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "sort" + "sync" + "time" + + "vitess.io/vitess/go/vt/log" +) + +var ( + clientDiscoveryInterval = flag.Int("client_discovery_interval_seconds", 30, "how often to update the client discovery") + clientDiscoveryEndpoint = flag.String("client_discovery_endpoint", "http://rotor-http-dev-us-east-1.internal.ec2.tinyspeck.com:50001/v3/discovery:endpoints", "rotor endpoint to query client list") + clientDiscoveryResourceType = flag.String("client_discovery_resource_type", "hhvm-metrics@dev-us-east-1", "client resource type") + clientIndex = 0 + clientIndexLock = &sync.Mutex{} +) + +func getWebappDiscovery() (*RotorJson, error) { + payload := fmt.Sprintf(`{"node":{"cluster":"webapp"}, "resource_names":["%s"]}`, *clientDiscoveryResourceType) + res, err := http.Post(*clientDiscoveryEndpoint, "application/json", bytes.NewBuffer([]byte(payload))) + if err != nil { + return nil, err + } + + if res.StatusCode != 200 { + return nil, fmt.Errorf("Non-200 response code from rotor: %d", res.StatusCode) + } + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + result := RotorJson{} + json.Unmarshal(body, &result) + return &result, nil +} + +func getHosts() ([]string, error) { + discovery, err := getWebappDiscovery() + if err != nil { + return nil, err + } + nodenames := []string{} + for _, r := range discovery.Resources { + for _, e := range r.Endpoints { + for _, lb := range e.LBEndpoints { + node := lb.Metadata.FilterMetadata.EnvoyLB + nodenames = append(nodenames, node.NodeName) + fmt.Printf("Nodename: %+v\n", node.NodeName) + } + } + } + + return nodenames, nil +} + +func setClientIndex() error { + nodenames, err := getHosts() + if err != nil { + return err + } + sort.Strings(nodenames) + hostname, err := os.Hostname() + if err != nil { + return err + } + + for i, n := range nodenames { + if n == hostname { + clientIndexLock.Lock() + defer clientIndexLock.Unlock() + clientIndex = i + log.Infof("Client index set to %d", i) + return nil + } + } + + return fmt.Errorf("hostname '%s' not found (client list length %d)", hostname, len(nodenames)) +} + +func clientDiscovery() { + ticker := time.NewTicker(time.Duration(*clientDiscoveryInterval) * time.Second) + + for range ticker.C { + if err := setClientIndex(); err != nil { + log.Errorf("Failed to set client index: %s", err) + } + } +} + +func startClientDiscovery() error { + if err := setClientIndex(); err != nil { + return err + } + + go clientDiscovery() + + return nil +} + +func getClientIndex() int { + clientIndexLock.Lock() + defer clientIndexLock.Unlock() + return clientIndex +} diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 78ea8d179b4..41f64cb8df5 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -17,12 +17,14 @@ package vtgateproxy import ( "bytes" + "cmp" "crypto/sha256" "encoding/json" "fmt" "io" "math/rand" "os" + "slices" "sort" "sync" "time" @@ -326,17 +328,27 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { targetCount.ResetAll() var selected = map[string][]targetHost{} - + idx := getClientIndex() for poolType := range allTargets { - b.sorter.shuffleSort(allTargets[poolType]) + poolTargets := allTargets[poolType] + if idx == -1 { + // If we can't find a client index, fall back to randomizing the list and picking the first N local connections + b.sorter.shuffleSort(poolTargets) + idx = 0 + } else { + stableSortTargets(poolTargets) + } // try to pick numConnections from the front of the list (local zone) and numBackupConnections // from the tail (remote zone). if that's not possible, just take the whole set - if len(allTargets[poolType]) >= b.numConnections+b.numBackupConns { + if len(poolTargets) >= b.numConnections+b.numBackupConns { remoteOffset := len(allTargets[poolType]) - b.numBackupConns - selected[poolType] = append(allTargets[poolType][:b.numConnections], allTargets[poolType][remoteOffset:]...) + for i := 0; i < b.numConnections; i++ { + selected[poolType] = append(selected[poolType], poolTargets[(idx+i)%remoteOffset]) + } + selected[poolType] = append(selected[poolType], poolTargets[remoteOffset:]...) } else { - selected[poolType] = allTargets[poolType] + selected[poolType] = poolTargets } targetCount.Set(poolType, int64(len(selected[poolType]))) @@ -407,6 +419,33 @@ func (s *shuffleSorter) shuffleSort(targets []targetHost) { } } +func stableSortTargets(targets []targetHost) { + head, tail := 0, len(targets) + + for i, t := range targets { + if t.IsLocal { + targets[head], targets[i] = targets[i], targets[head] + head += 1 + } else { + targets[tail-1], targets[i] = targets[i], targets[tail-1] + tail -= 1 + } + } + + slices.SortStableFunc(targets[:head], func(t1 targetHost, t2 targetHost) int { + return cmp.Compare(t1.Hostname, t2.Hostname) + }) + + slices.SortStableFunc(targets[head:], func(t1 targetHost, t2 targetHost) int { + return cmp.Compare(t1.Hostname, t2.Hostname) + }) + +} + +func DoTest() int { + return getClientIndex() +} + // Update the current list of hosts for the given resolver func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error { log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) diff --git a/go/vt/vtgateproxy/rotor_json.go b/go/vt/vtgateproxy/rotor_json.go new file mode 100644 index 00000000000..9f17f8eee48 --- /dev/null +++ b/go/vt/vtgateproxy/rotor_json.go @@ -0,0 +1,185 @@ +package vtgateproxy + +type RotorJson struct { + VersionInfo string `json:"version_info"` + Resources []Resource `json:"resources"` + TypeURL string `json:"type_url"` + ControlPlane ControlPlane `json:"control_plane"` +} + +type ControlPlane struct { + Identifier string `json:"identifier"` +} + +type Resource struct { + Type string `json:"@type"` + ClusterName string `json:"cluster_name"` + Endpoints []EndpointElement `json:"endpoints"` +} + +type EndpointElement struct { + LBEndpoints []LBEndpoint `json:"lb_endpoints"` +} + +type LBEndpoint struct { + Endpoint LBEndpointEndpoint `json:"endpoint"` + HealthStatus HealthStatus `json:"health_status"` + Metadata Metadata `json:"metadata"` +} + +type LBEndpointEndpoint struct { + Address Address `json:"address"` +} + +type Address struct { + SocketAddress SocketAddress `json:"socket_address"` +} + +type SocketAddress struct { + Address string `json:"address"` + PortValue int64 `json:"port_value"` +} + +type Metadata struct { + FilterMetadata FilterMetadata `json:"filter_metadata"` +} + +type FilterMetadata struct { + EnvoyLB EnvoyLB `json:"envoy.lb"` +} + +type EnvoyLB struct { + Consul Consul `json:"consul"` + NodeAddress string `json:"node-address"` + NodeHealth NodeHealth `json:"node-health"` + NodeID string `json:"node-id"` + NodeName string `json:"node-name"` + NodeASG ASG `json:"node:asg"` + NodeAz NodeAz `json:"node:az"` + NodeAzID AzID `json:"node:az_id"` + NodeConsulNetworkSegment string `json:"node:consul-network-segment"` + NodeConsulVersion ConsulVersion `json:"node:consul-version"` + NodeEnv Env `json:"node:env"` + NodeLSBRelease string `json:"node:lsb_release"` + NodeNebulaAddress string `json:"node:nebula_address"` + NodeOmniServiceID string `json:"node:omni_service_id"` + NodePlatform Platform `json:"node:platform"` + NodeProvider Provider `json:"node:provider"` + NodeRegion Region `json:"node:region"` + NodeRole Role `json:"node:role"` + TagDatacenter TagDatacenterEnum `json:"tag:datacenter"` + TagLegacyDc TagDatacenterEnum `json:"tag:legacy_dc"` +} + +type Consul struct { + NodeMeta NodeMeta `json:"nodeMeta"` + Tag Tag `json:"tag"` +} + +type NodeMeta struct { + ASG ASG `json:"asg"` + Az NodeAz `json:"az"` + AzID AzID `json:"az_id"` + ConsulNetworkSegment string `json:"consul-network-segment"` + ConsulVersion ConsulVersion `json:"consul-version"` + Env Env `json:"env"` + LSBRelease string `json:"lsb_release"` + NebulaAddress string `json:"nebula_address"` + OmniServiceID string `json:"omni_service_id"` + Platform Platform `json:"platform"` + Provider Provider `json:"provider"` + Region Region `json:"region"` + Role Role `json:"role"` +} + +type Tag struct { + DatacenterDevUsEast1 string `json:"datacenter:dev-us-east-1"` + LegacyDcDevUsEast1 string `json:"legacy_dc:dev-us-east-1"` +} + +type HealthStatus string + +const ( + Healthy HealthStatus = "HEALTHY" +) + +type ASG string + +const ( + ASGRdevCanvasWebapp ASG = "rdev-canvas-webapp" + DevContainersPoolLargeWhitecastle ASG = "dev-containers-pool-large-whitecastle" + DevContainersPoolTestWhitecastle ASG = "dev-containers-pool-test-whitecastle" + DevContainersPoolWhitecastle ASG = "dev-containers-pool-whitecastle" + Empty ASG = "" + QAEnvContainerPoolWc ASG = "qa-env-container-pool-wc" + QAEnvSandboxContainerPoolWc ASG = "qa-env-sandbox-container-pool-wc" +) + +type NodeAz string + +const ( + UsEast1A NodeAz = "us-east-1a" + UsEast1B NodeAz = "us-east-1b" + UsEast1C NodeAz = "us-east-1c" + UsEast1D NodeAz = "us-east-1d" +) + +type AzID string + +const ( + Use1Az1 AzID = "use1-az1" + Use1Az2 AzID = "use1-az2" + Use1Az4 AzID = "use1-az4" + Use1Az6 AzID = "use1-az6" +) + +type ConsulVersion string + +const ( + The1201Slack16 ConsulVersion = "1.20.1+slack.16" +) + +type Env string + +const ( + Dev Env = "dev" +) + +type Platform string + +const ( + Chef Platform = "chef" +) + +type Provider string + +const ( + Ec2 Provider = "ec2" +) + +type Region string + +const ( + UsEast1 Region = "us-east-1" +) + +type Role string + +const ( + RoleRdevCanvasWebapp Role = "rdev-canvas-webapp" + SlackDevContainer Role = "slack-dev-container" + SlackQAEnvContainer Role = "slack-qa-env-container" + SlackQAEnvSandboxContainer Role = "slack-qa-env-sandbox-container" +) + +type NodeHealth string + +const ( + Passing NodeHealth = "passing" +) + +type TagDatacenterEnum string + +const ( + DevUsEast1 TagDatacenterEnum = "dev-us-east-1" +) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index c7ff8f2c78e..3e549508f23 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -226,6 +226,10 @@ func Init() { return append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, *balancerType))), nil }) + if err := startClientDiscovery(); err != nil { + log.Fatalf("error starting client discovery: %v", err) + } + _, err := RegisterJSONGateResolver( *vtgateHostsFile, *addressField, From 24ec168793c056a24493e1f67db739eaed5d3de0 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Mon, 3 Feb 2025 13:15:46 +0000 Subject: [PATCH 2/2] Trigger target update if client list changes Signed-off-by: Henry Robinson --- go/vt/vtgateproxy/discovery.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 41f64cb8df5..20b621d0307 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -188,6 +188,7 @@ func (b *JSONGateResolverBuilder) start() error { go func() { var parseErr error + lastClientIndex := -1 for range b.ticker.C { checkFileStat, err := os.Stat(b.jsonPath) if err != nil { @@ -213,10 +214,13 @@ func (b *JSONGateResolverBuilder) start() error { continue } parseErr = nil - if !contentsChanged { + curClientIndex := getClientIndex() + clientsChanged := (lastClientIndex == curClientIndex) + if !contentsChanged && !clientsChanged { parseCount.Add("unchanged", 1) continue } + lastClientIndex = curClientIndex parseCount.Add("changed", 1) var wg sync.WaitGroup