Skip to content

Commit

Permalink
Fix first ready bug
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Robinson <hrobinson@slack-corp.com>
  • Loading branch information
henryr committed Jun 5, 2024
1 parent d951c5e commit 82c4df9
Showing 1 changed file with 27 additions and 30 deletions.
57 changes: 27 additions & 30 deletions go/vt/vtgateproxy/firstready_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ limitations under the License.

import (
"errors"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/resolver"

"vitess.io/vitess/go/vt/log"
)
Expand All @@ -54,8 +55,16 @@ func init() {
// Once a conn is chosen and is in the ready state, it will remain as the
// active subconn even if other connections become available.
type frPickerBuilder struct {
mu sync.Mutex
currentConn balancer.SubConn
}

type frConnWithAddr struct {
subConn balancer.SubConn
addr resolver.Address
}

type frPicker struct {
subConns []frConnWithAddr
cur uint32
}

func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
Expand All @@ -65,36 +74,24 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
return base.NewErrPicker(errors.New("no available connections"))
}

f.mu.Lock()
defer f.mu.Unlock()

// If we've already chosen a subconn, and it is still in the ready list, then
// no need to change state
if f.currentConn != nil {
log.V(100).Infof("first_ready: currentConn is active, checking if still ready")
for sc := range info.ReadySCs {
if f.currentConn == sc {
log.V(100).Infof("first_ready: currentConn still active - not changing")
return f
}
}
subConns := make([]frConnWithAddr, len(info.ReadySCs))
idx := 0
for sc, k := range info.ReadySCs {
subConns[idx] = frConnWithAddr{subConn: sc, addr: k.Address}
idx++
}

// Otherwise either we don't have an active conn or the conn we were using is
// no longer active, so pick an arbitrary new one out of the map.
log.V(100).Infof("first_ready: currentConn is not active, picking a new one")
for sc := range info.ReadySCs {
f.currentConn = sc
break
}

return f
return &frPicker{subConns: subConns, cur: 0}
}

// Pick simply returns the currently chosen conn
func (f *frPickerBuilder) Pick(balancer.PickInfo) (balancer.PickResult, error) {
f.mu.Lock()
defer f.mu.Unlock()

return balancer.PickResult{SubConn: f.currentConn}, nil
func (f *frPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
curIndex := atomic.LoadUint32(&f.cur)
return balancer.PickResult{SubConn: f.subConns[curIndex].subConn, Done: func(info balancer.DoneInfo) {
if info.Err != nil {
// Only try to move the index at most 1 - if someone else raced and advanced it, do nothing.
nextIndex := (curIndex + 1) % uint32(len(f.subConns))
atomic.CompareAndSwapUint32(&f.cur, curIndex, nextIndex)
}
}}, nil
}

0 comments on commit 82c4df9

Please sign in to comment.