Skip to content

Commit

Permalink
fix uint test
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold committed Aug 27, 2024
1 parent dd77e2e commit d16b37d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 33 deletions.
15 changes: 15 additions & 0 deletions core/xdsresource/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package xdsresource
import (
"encoding/json"
"fmt"
"strconv"
"time"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/protobuf/proto"
)

const (
cBErrorRateKey = "kitexCbErrorRate"
)

// RouteConfigResource is used for routing
// HTTPRouteConfig is the native http route config, which consists of a list of virtual hosts.
// ThriftRouteConfig is converted from the routeConfiguration in thrift proxy, which can only be configured in the listener filter
Expand Down Expand Up @@ -64,6 +69,7 @@ type RetryPolicy struct {
NumRetries int
PerTryTimeout time.Duration
PerTryIdleTimeout time.Duration
CBErrorRate float64
RetryBackOff *RetryBackOff
}

Expand Down Expand Up @@ -181,6 +187,15 @@ func unmarshalRoutes(rs []*v3routepb.Route) ([]*Route, error) {
PerTryTimeout: retryPolicy.GetPerTryTimeout().AsDuration(),
PerTryIdleTimeout: retryPolicy.GetPerTryIdleTimeout().AsDuration(),
}
// used for config the errRate.
for _, header := range retryPolicy.GetRetriableHeaders() {
if match := header.GetExactMatch(); match != "" && header.Name == cBErrorRateKey {
errRate, err := strconv.ParseFloat(match, 64)
if err == nil {
route.RetryPolicy.CBErrorRate = errRate
}
}
}
if backoff := retryPolicy.GetRetryBackOff(); backoff != nil {
route.RetryPolicy.RetryBackOff = &RetryBackOff{
BaseInterval: backoff.GetMaxInterval().AsDuration(),
Expand Down
3 changes: 2 additions & 1 deletion xdssuite/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type routerMetaExtractor func(context.Context) map[string]string
type Options struct {
routerMetaExtractor routerMetaExtractor // use metainfo.GetAllValues by default.
servicePort uint32
router *XDSRouter
}

func (o *Options) Apply(opts []Option) {
Expand Down Expand Up @@ -78,7 +79,7 @@ func NewClientSuite(opts ...Option) *clientSuite {
Resolver: NewXDSResolver(),
}),
NewCircuitBreaker(),
NewRetryPolicy(),
NewRetryPolicy(opts...),
}
return &clientSuite{cOpts}
}
Expand Down
62 changes: 33 additions & 29 deletions xdssuite/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ import (
"sync/atomic"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"

"github.com/kitex-contrib/xds/core/xdsresource"
)

const (
wildcardRetryKey = "*"
)

type retrySuit struct {
lastPolicies atomic.Value
*retry.Container
router *XDSRouter
}

func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) {
Expand All @@ -43,7 +42,6 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) {
lastPolicies = val.(map[string]struct{})
}

var wildcarRetryPolicy *retry.Policy
thisPolicies := make(map[string]struct{})
defer rc.lastPolicies.Store(thisPolicies)
for _, resource := range res {
Expand All @@ -65,7 +63,10 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) {
RetrySameNode: false,
StopPolicy: retry.StopPolicy{
MaxRetryTimes: route.RetryPolicy.NumRetries,
MaxDurationMS: uint32(route.RetryPolicy.PerTryTimeout.Milliseconds()),
MaxDurationMS: uint32(route.RetryPolicy.PerTryTimeout.Milliseconds()) * uint32(route.RetryPolicy.NumRetries),
CBPolicy: retry.CBPolicy{
ErrorRate: route.RetryPolicy.CBErrorRate,
},
},
},
}
Expand All @@ -88,22 +89,11 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) {
}
retryPolicy.FailurePolicy.BackOffPolicy = bop
rc.Container.NotifyPolicyChange(cluster.Name, retryPolicy)
// FIXME: The logic of retry is before the router, the value of key RouterClusterKey
// can't be found, so use wildcard temporarily and set the global policy here. And it recommend
// using envoyfilter to config the retry policy.
wildcarRetryPolicy = retryPolicy.DeepCopy()
}
}
}
}

if wildcarRetryPolicy == nil {
wildcarRetryPolicy = &retry.Policy{
Enable: false,
}
}
rc.Container.NotifyPolicyChange(wildcardRetryKey, *wildcarRetryPolicy)

for key := range lastPolicies {
if _, ok := thisPolicies[key]; !ok {
rc.Container.DeletePolicy(key)
Expand All @@ -112,34 +102,48 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) {
}

// NewRetryPolicy integrate xds config and kitex circuitbreaker
func NewRetryPolicy() client.Option {
func NewRetryPolicy(opts ...Option) client.Option {
m := xdsResourceManager.getManager()
if m == nil {
panic("xds resource manager has not been initialized")
}

retry := &retrySuit{
Container: retry.NewRetryContainer(retry.WithCustomizeKeyFunc(genRetryServiceKey)),
rs := &retrySuit{
router: NewXDSRouter(opts...),
}
rs.Container = retry.NewRetryContainer(retry.WithCustomizeKeyFunc(rs.genRetryServiceKey))

m.RegisterXDSUpdateHandler(xdsresource.RouteConfigType, func(res map[string]xdsresource.Resource) {
updateRetryPolicy(retry, res)
updateRetryPolicy(rs, res)
})
return client.WithRetryContainer(retry.Container)
return client.WithRetryContainer(rs.Container)
}

// keep consistent when initialising the circuit breaker suit and updating
// the retry policy.
func genRetryServiceKey(ctx context.Context, ri rpcinfo.RPCInfo) string {
func (rs *retrySuit) genRetryServiceKey(ctx context.Context, ri rpcinfo.RPCInfo) string {
if ri == nil {
return ""
}
dest := ri.To()
if dest == nil {
return ""
}

// the value of RouterClusterKey is stored in route process.
// FIXME: The logic of retry is before the router, the value of key RouterClusterKey
// can't be found, so use wildcard temporarily.
key, _ := ri.To().Tag(RouterClusterKey)
if key == "" {
return wildcardRetryKey
key, exist := ri.To().Tag(RouterClusterKey)
if exist {
return key
}
res, err := rs.router.Route(ctx, ri)
if err != nil {
klog.Warnf("[XDS] get router key failed err: %v", err)
return ""
}
return key
// set destination
_ = remoteinfo.AsRemoteInfo(dest).SetTag(RouterClusterKey, res.ClusterPicked)
remoteinfo.AsRemoteInfo(dest).SetTagLock(RouterClusterKey)
// set timeout
_ = rpcinfo.AsMutableRPCConfig(ri.Config()).SetRPCTimeout(res.RPCTimeout)
return res.ClusterPicked
}
6 changes: 3 additions & 3 deletions xdssuite/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestRetry(t *testing.T) {
RetrySameNode: false,
StopPolicy: retry.StopPolicy{
MaxRetryTimes: 2,
MaxDurationMS: 1000,
MaxDurationMS: 2000,
},
BackOffPolicy: &retry.BackOffPolicy{
BackOffType: retry.FixedBackOffType,
Expand All @@ -137,7 +137,7 @@ func TestRetry(t *testing.T) {
RetrySameNode: false,
StopPolicy: retry.StopPolicy{
MaxRetryTimes: 3,
MaxDurationMS: 1000,
MaxDurationMS: 3000,
},
BackOffPolicy: &retry.BackOffPolicy{
BackOffType: retry.RandomBackOffType,
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestRetry(t *testing.T) {
RetrySameNode: false,
StopPolicy: retry.StopPolicy{
MaxRetryTimes: 2,
MaxDurationMS: 5000,
MaxDurationMS: 10000,
},
BackOffPolicy: &retry.BackOffPolicy{
BackOffType: retry.NoneBackOffType,
Expand Down
5 changes: 5 additions & 0 deletions xdssuite/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func NewXDSRouterMiddleware(opts ...Option) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request, response interface{}) error {
ri := rpcinfo.GetRPCInfo(ctx)
_, exist := ri.To().Tag(RouterClusterKey)
if exist {
return next(ctx, request, response)
}

dest := ri.To()
if dest == nil {
return kerrors.ErrNoDestService
Expand Down

0 comments on commit d16b37d

Please sign in to comment.