Skip to content
This repository has been archived by the owner on Jul 5, 2022. It is now read-only.

Commit

Permalink
add request context timeout for network plugin (#16)
Browse files Browse the repository at this point in the history
Co-authored-by: nyanpassu <nyanpassu@outlook.com>
  • Loading branch information
雾雨 and nyanpassu authored Mar 1, 2021
1 parent f57cc58 commit e6e79df
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 36 deletions.
13 changes: 8 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Application struct {
DriverName string
IpamDriverName string
DialTimeout time.Duration
RequestTimeout time.Duration
CertFile string
KeyFile string
ShutdownTimeout time.Duration
Expand Down Expand Up @@ -113,7 +114,9 @@ func (app Application) defaultMode() ([]service.Service, error) {
if dockerCli, err = app.getDockerClient(); err != nil {
return nil, err
}
if stor, err = app.getEtcdClient(context.Background(), apiConfig); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), app.RequestTimeout)
defer cancel()
if stor, err = app.getEtcdClient(ctx, apiConfig); err != nil {
return nil, err
}
if gid, err = getDockerGid(); err != nil {
Expand All @@ -134,8 +137,8 @@ func (app Application) defaultMode() ([]service.Service, error) {
hosts: app.Hosts,
},
pluginService{
ipam: fixedIPDriver.NewIpam(vess.FixedIPAllocator()),
driver: fixedIPDriver.NewDriver(client, dockerCli, agent, app.Hostname),
ipam: fixedIPDriver.NewIpam(vess.FixedIPAllocator(), app.RequestTimeout),
driver: fixedIPDriver.NewDriver(client, dockerCli, agent, app.Hostname, app.RequestTimeout),
server: driver.NewPluginServer(app.DriverName, app.IpamDriverName),
})
return services, nil
Expand Down Expand Up @@ -183,8 +186,8 @@ func (app Application) networkPluginOnlyMode() ([]service.Service, error) {
allocator = vessel.NewIPPoolManager(client, dockerCli, app.DriverName, app.Hostname)
return []service.Service{
pluginService{
ipam: calicoDriver.NewIpam(allocator),
driver: calicoDriver.NewDriver(client, dockerCli, app.Hostname),
ipam: calicoDriver.NewIpam(allocator, app.RequestTimeout),
driver: calicoDriver.NewDriver(client, dockerCli, app.Hostname, app.RequestTimeout),
server: driver.NewPluginServer(app.DriverName, app.IpamDriverName),
},
}, nil
Expand Down
7 changes: 6 additions & 1 deletion barrel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func run(c *cli.Context) (err error) {
DriverName: driver.DriverName,
IpamDriverName: driver.DriverName + driver.IpamSuffix,
DialTimeout: time.Duration(6) * time.Second,
RequestTimeout: c.Duration("request-timeout"),
CertFile: c.String("tls-cert"),
KeyFile: c.String("tls-key"),
ShutdownTimeout: time.Duration(30) * time.Second,
Expand All @@ -84,7 +85,6 @@ func main() {
Flags: []cli.Flag{
&cli.StringFlag{
Name: "hostname",
Aliases: []string{"h"},
Usage: "hostname",
EnvVars: []string{"HOSTNAME"},
},
Expand Down Expand Up @@ -132,6 +132,11 @@ func main() {
Usage: "for dial timeout",
Value: time.Second * 2,
},
&cli.DurationFlag{
Name: "request-timeout",
Usage: "for barrel request services(docker, etcd, etc.) timeout",
Value: time.Second * 120,
},
&cli.StringFlag{
Name: "log-level",
Value: "INFO",
Expand Down
16 changes: 12 additions & 4 deletions driver/calico/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"time"

pluginIpam "github.com/docker/go-plugins-helpers/ipam"
caliconet "github.com/projectcalico/libcalico-go/lib/net"
Expand All @@ -18,13 +19,15 @@ import (
type Ipam struct {
vessel.CalicoIPAllocator
utils.LoggerFactory
requestTimeout time.Duration
}

// NewIpam .
func NewIpam(ipAllocator vessel.CalicoIPAllocator) Ipam {
func NewIpam(ipAllocator vessel.CalicoIPAllocator, requestTimeout time.Duration) Ipam {
return Ipam{
CalicoIPAllocator: ipAllocator,
LoggerFactory: utils.NewObjectLogger("CalicoIPIpam"),
requestTimeout: requestTimeout,
}
}

Expand Down Expand Up @@ -73,7 +76,9 @@ func (ipam Ipam) RequestPool(request *pluginIpam.RequestPoolRequest) (*pluginIpa
// If a pool (subnet on the CLI) is specified, it must match one of the
// preconfigured Calico pools.
if request.Pool != "" {
if pool, err = ipam.GetPoolByCIDR(context.Background(), request.Pool); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout)
defer cancel()
if pool, err = ipam.GetPoolByCIDR(ctx, request.Pool); err != nil {
logger.Errorf("request calico pool error, %v", err)
return nil, err
}
Expand Down Expand Up @@ -111,9 +116,10 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu
var (
address string
err error
ctx = context.Background()
)

ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout)
defer cancel()
if request.Address == "" {
var ipAddr types.IPAddress
if ipAddr, err = ipam.AllocIPFromPool(ctx, request.PoolID); err != nil {
Expand All @@ -135,7 +141,9 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu

// ReleaseAddress .
func (ipam Ipam) ReleaseAddress(request *pluginIpam.ReleaseAddressRequest) error {
return ipam.UnallocIP(context.Background(), types.IP{PoolID: request.PoolID, Address: request.Address})
ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout)
defer cancel()
return ipam.UnallocIP(ctx, types.IP{PoolID: request.PoolID, Address: request.Address})
}

// IPv4ToCidr .
Expand Down
70 changes: 50 additions & 20 deletions driver/calico/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ type Driver struct {

createProfiles bool
labelEndpoints bool

requestTimeout time.Duration
}

// NewDriver .
func NewDriver(
client clientv3.Interface,
dockerCli *dockerClient.Client,
hostname string,
requestTimeout time.Duration,
) Driver {
driver := Driver{
client: client,
Expand All @@ -80,6 +83,7 @@ func NewDriver(

// default: disabled, enable by setting env key to true (case insensitive)
labelEndpoints: strings.EqualFold(os.Getenv(labelEndpointsEnvKey), "true"),
requestTimeout: requestTimeout,
}

ns := os.Getenv(namespaceEnvKey)
Expand Down Expand Up @@ -250,8 +254,6 @@ func (d Driver) DeleteNetwork(request *network.DeleteNetworkRequest) error {

// CreateEndpoint .
func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network.CreateEndpointResponse, error) {
ctx := context.Background()

log.Debugf("Creating endpoint %v\n", request.EndpointID)
if request.Interface.Address == "" && request.Interface.AddressIPv6 == "" {
err := errors.New("No address assigned for endpoint")
Expand Down Expand Up @@ -311,7 +313,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network
endpoint.Spec.IPNetworks = append(endpoint.Spec.IPNetworks, addr.String())
}

pools, err := d.client.IPPools().List(ctx, options.ListOptions{})
getPoolsCtx, cancelListPoolsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelListPoolsCtx()
pools, err := d.client.IPPools().List(getPoolsCtx, options.ListOptions{})
if err != nil {
log.Errorf("Network %v gather error, %v", request.NetworkID, err)
return nil, err
Expand All @@ -337,7 +341,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network
endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, networkName)

// Check if exists
if _, err := d.client.Profiles().Get(ctx, networkName, options.GetOptions{}); err != nil {
getProfileCtx, cancelGetProfileCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelGetProfileCtx()
if _, err := d.client.Profiles().Get(getProfileCtx, networkName, options.GetOptions{}); err != nil {
// If a profile for the network name doesn't exist then it needs to be created.
// We always attempt to create the profile and rely on the datastore to reject
// the request if the profile already exists.
Expand All @@ -353,7 +359,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network
}}},
},
}
if _, err := d.client.Profiles().Create(ctx, profile, options.SetOptions{}); err != nil {
createProfileCtx, cancelCreateProfileCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelCreateProfileCtx()
if _, err := d.client.Profiles().Create(createProfileCtx, profile, options.SetOptions{}); err != nil {
if _, ok := err.(libcalicoErrors.ErrorResourceAlreadyExists); !ok {
log.Errorln(err)
return nil, err
Expand All @@ -363,7 +371,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network
}

// Create the endpoint last to minimize side-effects if something goes wrong.
endpoint, err = d.client.WorkloadEndpoints().Create(ctx, endpoint, options.SetOptions{})
createWepCtx, cancelCreateWepCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelCreateWepCtx()
endpoint, err = d.client.WorkloadEndpoints().Create(createWepCtx, endpoint, options.SetOptions{})
if err != nil {
log.Errorf("Workload endpoints creation error, data: %+v, %v", endpoint, err)
return nil, err
Expand All @@ -389,8 +399,10 @@ func (d Driver) DeleteEndpoint(request *network.DeleteEndpointRequest) error {
return err
}

deleteWepCtx, cancelDeleteWepCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelDeleteWepCtx()
if _, err = d.client.WorkloadEndpoints().Delete(
context.Background(), d.namespace,
deleteWepCtx, d.namespace,
wepName, options.DeleteOptions{}); err != nil {
log.Errorf("Endpoint %v removal error, %v", request.EndpointID, err)
return err
Expand All @@ -405,7 +417,6 @@ func (d Driver) EndpointInfo(request *network.InfoRequest) (*network.InfoRespons

// Join .
func (d Driver) Join(request *network.JoinRequest) (*network.JoinResponse, error) {
ctx := context.Background()
// 1) Set up a veth pair
// The one end will stay in the host network namespace - named caliXXXXX
// The other end is given a temporary name. It's moved into the final network namespace by libnetwork itself.
Expand All @@ -429,7 +440,10 @@ func (d Driver) Join(request *network.JoinRequest) (*network.JoinResponse, error
log.Errorln(err)
return nil, err
}
wep, err := weps.Get(ctx, d.namespace, wepName, options.GetOptions{})

getWepsCtx, cancelGetWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelGetWepsCtx()
wep, err := weps.Get(getWepsCtx, d.namespace, wepName, options.GetOptions{})
if err != nil {
log.Errorln(err)
return nil, err
Expand All @@ -440,7 +454,10 @@ func (d Driver) Join(request *network.JoinRequest) (*network.JoinResponse, error
return nil, err
}
wep.Spec.MAC = tempNIC.Attrs().HardwareAddr.String()
_, err = weps.Update(ctx, wep, options.SetOptions{})

updateWepsCtx, cancelUpdateWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelUpdateWepsCtx()
_, err = weps.Update(updateWepsCtx, wep, options.SetOptions{})
if err != nil {
log.Errorln(err)
return nil, err
Expand Down Expand Up @@ -494,7 +511,9 @@ func (d Driver) FindPoolByNetworkID(networkID string) (*api.IPPool, error) {
err error
)

if pools, err = d.client.IPPools().List(context.Background(), options.ListOptions{}); err != nil {
listPoolsCtx, cancelListPoolsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelListPoolsCtx()
if pools, err = d.client.IPPools().List(listPoolsCtx, options.ListOptions{}); err != nil {
log.Errorf("[calico.NetworkDriver::FindPoolByNetworkID] Network %v gather error, %v", networkID, err)
return nil, err
}
Expand Down Expand Up @@ -542,8 +561,6 @@ func (d Driver) RevokeExternalConnectivity(*network.RevokeExternalConnectivityRe
// container list in the NetworkInspect and make the Container available
// for inspecting.
func (d Driver) populateWorkloadEndpointWithLabels(request *network.CreateEndpointRequest, endpoint *api.WorkloadEndpoint) {
ctx := context.Background()

networkID := request.NetworkID
endpointID := request.EndpointID

Expand All @@ -568,7 +585,9 @@ RETRY_NETWORK_INSPECT:
}

// inspect our custom network
networkData, err := d.dockerCli.NetworkInspect(ctx, networkID, dockerTypes.NetworkInspectOptions{})
inspectNetworkCtx, cancelInspectNetworkCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelInspectNetworkCtx()
networkData, err := d.dockerCli.NetworkInspect(inspectNetworkCtx, networkID, dockerTypes.NetworkInspectOptions{})
if err != nil {
err = errors.Annotatef(err, "Error inspecting network %s - retrying (T=%s)", networkID, time.Since(start))
log.Warningln(err)
Expand Down Expand Up @@ -616,7 +635,9 @@ RETRY_CONTAINER_INSPECT:
return
}

containerInfo, err := d.dockerCli.ContainerInspect(ctx, containerID)
inspectContainerCtx, cancelInspectContainerCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelInspectContainerCtx()
containerInfo, err := d.dockerCli.ContainerInspect(inspectContainerCtx, containerID)
if err != nil {
err = errors.Annotatef(err, "Error inspecting container %s for labels - retrying (T=%s)", containerID, time.Since(start))
log.Warningln(err)
Expand Down Expand Up @@ -655,11 +676,16 @@ RETRY_UPDATE_ENDPOINT:
}

// lets update the workloadEndpoint
_, err = d.client.WorkloadEndpoints().Update(ctx, endpoint, options.SetOptions{})
updateWepsCtx, cancelUpdateWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelUpdateWepsCtx()
_, err = d.client.WorkloadEndpoints().Update(updateWepsCtx, endpoint, options.SetOptions{})
if err != nil {
err = errors.Annotatef(err, "Unable to update WorkloadEndpoint with labels (T=%s)", time.Since(start))
log.Warningln(err)
endpoint, err = d.client.WorkloadEndpoints().Get(ctx, endpoint.Namespace, endpoint.Name, options.GetOptions{})

getWepsCtx, cancelGetWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelGetWepsCtx()
endpoint, err = d.client.WorkloadEndpoints().Get(getWepsCtx, endpoint.Namespace, endpoint.Name, options.GetOptions{})
if err != nil {
err = errors.Annotatef(err, "Unable to get WorkloadEndpoint (T=%s)", time.Since(start))
log.Errorln(err)
Expand All @@ -684,9 +710,11 @@ func (d Driver) generateEndpointName(hostname, endpointID string) (string, error
}

func (d Driver) populatePoolLabel(pools []string, networkID string) error {
ctx := context.Background()
poolClient := d.client.IPPools()
ipPools, err := poolClient.List(ctx, options.ListOptions{})

listPoolsCtx, cancelListPoolsCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelListPoolsCtx()
ipPools, err := poolClient.List(listPoolsCtx, options.ListOptions{})
if err != nil {
log.Errorln(err)
return err
Expand All @@ -701,7 +729,9 @@ func (d Driver) populatePoolLabel(pools []string, networkID string) error {
ann[dockerLabelPrefix+"network.ID"] = networkID
ipPool.SetAnnotations(ann)
// TODO need remove nolint and use unittest to cover this case
if _, err = poolClient.Update(ctx, &ipPool, options.SetOptions{}); err != nil { // nolint
updatePoolCtx, cancelUpdatePoolCtx := context.WithTimeout(context.Background(), d.requestTimeout)
defer cancelUpdatePoolCtx()
if _, err = poolClient.Update(updatePoolCtx, &ipPool, options.SetOptions{}); err != nil { // nolint
log.Errorln(err)
return err
}
Expand Down
14 changes: 11 additions & 3 deletions driver/fixedip/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fixedip

import (
"context"
"time"

calicoDriver "github.com/projecteru2/barrel/driver/calico"
"github.com/projecteru2/barrel/types"
Expand All @@ -16,16 +17,19 @@ type Ipam struct {
calicoDriver.Ipam
utils.LoggerFactory
vessel.FixedIPAllocator
requestTimeout time.Duration
}

// NewIpam .
func NewIpam(
allocator vessel.FixedIPAllocator,
requestTimeout time.Duration,
) pluginIpam.Ipam {
return Ipam{
Ipam: calicoDriver.NewIpam(allocator),
Ipam: calicoDriver.NewIpam(allocator, requestTimeout),
LoggerFactory: utils.NewObjectLogger("FixedIPIpam"),
FixedIPAllocator: allocator,
requestTimeout: requestTimeout,
}
}

Expand All @@ -43,8 +47,10 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu
return ipam.Ipam.RequestAddress(request)
}

ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout)
defer cancel()
if err := ipam.AssignFixedIP(
context.Background(),
ctx,
types.IP{
PoolID: request.PoolID,
Address: request.Address,
Expand All @@ -64,8 +70,10 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu

// ReleaseAddress .
func (ipam Ipam) ReleaseAddress(request *pluginIpam.ReleaseAddressRequest) error {
ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout)
defer cancel()
if err := ipam.UnassignFixedIP(
context.Background(),
ctx,
types.IP{
PoolID: request.PoolID,
Address: request.Address,
Expand Down
Loading

0 comments on commit e6e79df

Please sign in to comment.