diff --git a/client/v3/client.go b/client/v3/client.go index 8789acb38c86..aee0aa2cc265 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -384,14 +384,13 @@ func newClient(cfg *Config) (*Client, error) { ctx, cancel := context.WithCancel(baseCtx) client := &Client{ - conn: nil, - cfg: *cfg, - creds: creds, - ctx: ctx, - cancel: cancel, - epMu: new(sync.RWMutex), - callOpts: defaultCallOpts, - lgMu: new(sync.RWMutex), + conn: nil, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, + epMu: new(sync.RWMutex), + lgMu: new(sync.RWMutex), } var err error @@ -414,22 +413,9 @@ func newClient(cfg *Config) (*Client, error) { client.Password = cfg.Password client.authTokenBundle = credentials.NewPerRPCCredentialBundle() } - if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 { - if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize { - return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize) - } - callOpts := []grpc.CallOption{ - defaultWaitForReady, - defaultMaxCallSendMsgSize, - defaultMaxCallRecvMsgSize, - } - if cfg.MaxCallSendMsgSize > 0 { - callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize) - } - if cfg.MaxCallRecvMsgSize > 0 { - callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize) - } - client.callOpts = callOpts + client.callOpts, err = Options(cfg) + if err != nil { + return nil, err } client.resolver = resolver.New(cfg.Endpoints...) diff --git a/client/v3/kubernetes/client.go b/client/v3/kubernetes/client.go index 4ca0e5fe8c1f..5a7c0728e68a 100644 --- a/client/v3/kubernetes/client.go +++ b/client/v3/kubernetes/client.go @@ -17,6 +17,8 @@ package kubernetes import ( "context" + "google.golang.org/grpc" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -33,6 +35,10 @@ func New(cfg clientv3.Config) (*Client, error) { Client: c, kv: clientv3.RetryKVClient(c), } + kc.callOpts, err = clientv3.Options(&cfg) + if err != nil { + return nil, err + } kc.Kubernetes = kc return kc, nil } @@ -41,12 +47,13 @@ type Client struct { *clientv3.Client Kubernetes Interface kv pb.KVClient + callOpts []grpc.CallOption } var _ Interface = (*Client)(nil) func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) { - rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision)) + rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision), k.callOpts...) if err != nil { return resp, clientv3.ContextError(ctx, err) } @@ -66,7 +73,7 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp RangeEnd: []byte(rangeEnd), Limit: opts.Limit, Revision: opts.Revision, - }) + }, k.callOpts...) if err != nil { return resp, clientv3.ContextError(ctx, err) } @@ -81,7 +88,7 @@ func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64 Key: []byte(prefix), RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)), CountOnly: true, - }) + }, k.callOpts...) if err != nil { return 0, clientv3.ContextError(ctx, err) } @@ -145,7 +152,7 @@ func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision if onFailure != nil { txn.Failure = []*pb.RequestOp{onFailure} } - return k.kv.Txn(ctx, txn) + return k.kv.Txn(ctx, txn, k.callOpts...) } func getRequest(key string, revision int64) *pb.RangeRequest { diff --git a/client/v3/options.go b/client/v3/options.go index cc10a03d76d1..9efd66dd35ba 100644 --- a/client/v3/options.go +++ b/client/v3/options.go @@ -15,6 +15,7 @@ package clientv3 import ( + "fmt" "math" "time" @@ -67,3 +68,24 @@ var defaultCallOpts = []grpc.CallOption{ // MaxLeaseTTL is the maximum lease TTL value const MaxLeaseTTL = 9000000000 + +func Options(cfg *Config) ([]grpc.CallOption, error) { + callOpts := defaultCallOpts + if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 { + if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize { + return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize) + } + callOpts = []grpc.CallOption{ + defaultWaitForReady, + defaultMaxCallSendMsgSize, + defaultMaxCallRecvMsgSize, + } + if cfg.MaxCallSendMsgSize > 0 { + callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize) + } + if cfg.MaxCallRecvMsgSize > 0 { + callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize) + } + } + return callOpts, nil +}