diff --git a/pkg/client/options.go b/pkg/client/options.go index 9c209ea..d722330 100644 --- a/pkg/client/options.go +++ b/pkg/client/options.go @@ -60,13 +60,6 @@ func getRequestOpts(i *info.RequestInfo, options psrpc.ClientOpts, opts ...psrpc opt(o) } - if o.SelectionOpts.Jitter < 0 { - o.SelectionOpts.Jitter = 0 - } - if o.SelectionOpts.Jitter > 1 { - o.SelectionOpts.Jitter = 1 - } - return *o } diff --git a/pkg/client/rpc.go b/pkg/client/rpc.go index 865a61b..f2a5dde 100644 --- a/pkg/client/rpc.go +++ b/pkg/client/rpc.go @@ -17,8 +17,6 @@ package client import ( "context" "errors" - r "math/rand" - "sort" "time" "google.golang.org/protobuf/proto" @@ -178,52 +176,44 @@ func selectServer( time.AfterFunc(opts.AffinityTimeout, cancel) } - shorted := false - var resErr error - var claims []*internal.ClaimRequest - var resCount int + var ( + shorted bool + serverID string + affinity float32 + claims []*psrpc.Claim + claimCount int + resErr error + ) for { select { case <-ctx.Done(): - if len(claims) > 0 { - sort.Slice(claims, func(i, j int) bool { - return claims[i].Affinity > claims[j].Affinity - }) - - best := claims[0].Affinity - minAffinity := best * (1 - opts.Jitter) - - i := 0 - for ; i < len(claims); i++ { - if claims[i].Affinity < minAffinity { - break - } - } - - return claims[r.Intn(i)].ServerId, nil - } - - if resErr != nil { + switch { + case opts.SelectionFunc != nil: + return opts.SelectionFunc(claims) + case serverID != "": + return serverID, nil + case resErr != nil: return "", resErr - } - - if len(claims) == 0 { - if resCount > 0 { - return "", psrpc.NewErrorf(psrpc.Unavailable, "no servers available (received %d responses)", resCount) - } else { - return "", psrpc.ErrNoResponse - } + case claimCount > 0: + return "", psrpc.NewErrorf(psrpc.Unavailable, "no servers available (received %d responses)", claimCount) + default: + return "", psrpc.ErrNoResponse } case claim := <-claimChan: - resCount++ + claimCount++ if (opts.MinimumAffinity > 0 && claim.Affinity >= opts.MinimumAffinity) || opts.MinimumAffinity <= 0 { if opts.AcceptFirstAvailable || opts.MaximumAffinity > 0 && claim.Affinity >= opts.MaximumAffinity { return claim.ServerId, nil } - claims = append(claims, claim) + if opts.SelectionFunc != nil { + claims = append(claims, &psrpc.Claim{ServerID: claim.ServerId, Affinity: claim.Affinity}) + } else if claim.Affinity > affinity { + serverID = claim.ServerId + affinity = claim.Affinity + } if opts.ShortCircuitTimeout > 0 && !shorted { shorted = true diff --git a/request.go b/request.go index e93d38b..789a2ba 100644 --- a/request.go +++ b/request.go @@ -29,12 +29,17 @@ type RequestOpts struct { } type SelectionOpts struct { - MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler - MaximumAffinity float32 // if > 0, any server returning a max score will be selected immediately - Jitter float32 // randomness applied to selection (0 to 1) - AcceptFirstAvailable bool // go fast - AffinityTimeout time.Duration // server selection deadline - ShortCircuitTimeout time.Duration // deadline imposed after receiving first response + MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler + MaximumAffinity float32 // if > 0, any server returning a max score will be selected immediately + AcceptFirstAvailable bool // go fast + AffinityTimeout time.Duration // server selection deadline + ShortCircuitTimeout time.Duration // deadline imposed after receiving first response + SelectionFunc func([]*Claim) (string, error) // custom server selection function +} + +type Claim struct { + ServerID string + Affinity float32 } func WithRequestTimeout(timeout time.Duration) RequestOption {