diff --git a/app/app.go b/app/app.go index dabfd7d..0bb118b 100644 --- a/app/app.go +++ b/app/app.go @@ -37,6 +37,7 @@ type Application struct { DriverName string IpamDriverName string DialTimeout time.Duration + RequestTimeout time.Duration CertFile string KeyFile string ShutdownTimeout time.Duration @@ -113,7 +114,9 @@ func (app Application) defaultMode() ([]service.Service, error) { if dockerCli, err = app.getDockerClient(); err != nil { return nil, err } - if stor, err = app.getEtcdClient(context.Background(), apiConfig); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), app.RequestTimeout) + defer cancel() + if stor, err = app.getEtcdClient(ctx, apiConfig); err != nil { return nil, err } if gid, err = getDockerGid(); err != nil { @@ -134,8 +137,8 @@ func (app Application) defaultMode() ([]service.Service, error) { hosts: app.Hosts, }, pluginService{ - ipam: fixedIPDriver.NewIpam(vess.FixedIPAllocator()), - driver: fixedIPDriver.NewDriver(client, dockerCli, agent, app.Hostname), + ipam: fixedIPDriver.NewIpam(vess.FixedIPAllocator(), app.RequestTimeout), + driver: fixedIPDriver.NewDriver(client, dockerCli, agent, app.Hostname, app.RequestTimeout), server: driver.NewPluginServer(app.DriverName, app.IpamDriverName), }) return services, nil @@ -183,8 +186,8 @@ func (app Application) networkPluginOnlyMode() ([]service.Service, error) { allocator = vessel.NewIPPoolManager(client, dockerCli, app.DriverName, app.Hostname) return []service.Service{ pluginService{ - ipam: calicoDriver.NewIpam(allocator), - driver: calicoDriver.NewDriver(client, dockerCli, app.Hostname), + ipam: calicoDriver.NewIpam(allocator, app.RequestTimeout), + driver: calicoDriver.NewDriver(client, dockerCli, app.Hostname, app.RequestTimeout), server: driver.NewPluginServer(app.DriverName, app.IpamDriverName), }, }, nil diff --git a/barrel.go b/barrel.go index f119aa6..96bf1d5 100644 --- a/barrel.go +++ b/barrel.go @@ -63,6 +63,7 @@ func run(c *cli.Context) (err error) { DriverName: driver.DriverName, IpamDriverName: driver.DriverName + driver.IpamSuffix, DialTimeout: time.Duration(6) * time.Second, + RequestTimeout: c.Duration("request-timeout"), CertFile: c.String("tls-cert"), KeyFile: c.String("tls-key"), ShutdownTimeout: time.Duration(30) * time.Second, @@ -84,7 +85,6 @@ func main() { Flags: []cli.Flag{ &cli.StringFlag{ Name: "hostname", - Aliases: []string{"h"}, Usage: "hostname", EnvVars: []string{"HOSTNAME"}, }, @@ -132,6 +132,11 @@ func main() { Usage: "for dial timeout", Value: time.Second * 2, }, + &cli.DurationFlag{ + Name: "request-timeout", + Usage: "for barrel request services(docker, etcd, etc.) timeout", + Value: time.Second * 120, + }, &cli.StringFlag{ Name: "log-level", Value: "INFO", diff --git a/driver/calico/ipam.go b/driver/calico/ipam.go index 180c1ea..391a958 100644 --- a/driver/calico/ipam.go +++ b/driver/calico/ipam.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "time" pluginIpam "github.com/docker/go-plugins-helpers/ipam" caliconet "github.com/projectcalico/libcalico-go/lib/net" @@ -18,13 +19,15 @@ import ( type Ipam struct { vessel.CalicoIPAllocator utils.LoggerFactory + requestTimeout time.Duration } // NewIpam . -func NewIpam(ipAllocator vessel.CalicoIPAllocator) Ipam { +func NewIpam(ipAllocator vessel.CalicoIPAllocator, requestTimeout time.Duration) Ipam { return Ipam{ CalicoIPAllocator: ipAllocator, LoggerFactory: utils.NewObjectLogger("CalicoIPIpam"), + requestTimeout: requestTimeout, } } @@ -73,7 +76,9 @@ func (ipam Ipam) RequestPool(request *pluginIpam.RequestPoolRequest) (*pluginIpa // If a pool (subnet on the CLI) is specified, it must match one of the // preconfigured Calico pools. if request.Pool != "" { - if pool, err = ipam.GetPoolByCIDR(context.Background(), request.Pool); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout) + defer cancel() + if pool, err = ipam.GetPoolByCIDR(ctx, request.Pool); err != nil { logger.Errorf("request calico pool error, %v", err) return nil, err } @@ -111,9 +116,10 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu var ( address string err error - ctx = context.Background() ) + ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout) + defer cancel() if request.Address == "" { var ipAddr types.IPAddress if ipAddr, err = ipam.AllocIPFromPool(ctx, request.PoolID); err != nil { @@ -135,7 +141,9 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu // ReleaseAddress . func (ipam Ipam) ReleaseAddress(request *pluginIpam.ReleaseAddressRequest) error { - return ipam.UnallocIP(context.Background(), types.IP{PoolID: request.PoolID, Address: request.Address}) + ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout) + defer cancel() + return ipam.UnallocIP(ctx, types.IP{PoolID: request.PoolID, Address: request.Address}) } // IPv4ToCidr . diff --git a/driver/calico/network.go b/driver/calico/network.go index ab5e60e..cd659f5 100644 --- a/driver/calico/network.go +++ b/driver/calico/network.go @@ -52,6 +52,8 @@ type Driver struct { createProfiles bool labelEndpoints bool + + requestTimeout time.Duration } // NewDriver . @@ -59,6 +61,7 @@ func NewDriver( client clientv3.Interface, dockerCli *dockerClient.Client, hostname string, + requestTimeout time.Duration, ) Driver { driver := Driver{ client: client, @@ -80,6 +83,7 @@ func NewDriver( // default: disabled, enable by setting env key to true (case insensitive) labelEndpoints: strings.EqualFold(os.Getenv(labelEndpointsEnvKey), "true"), + requestTimeout: requestTimeout, } ns := os.Getenv(namespaceEnvKey) @@ -250,8 +254,6 @@ func (d Driver) DeleteNetwork(request *network.DeleteNetworkRequest) error { // CreateEndpoint . func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network.CreateEndpointResponse, error) { - ctx := context.Background() - log.Debugf("Creating endpoint %v\n", request.EndpointID) if request.Interface.Address == "" && request.Interface.AddressIPv6 == "" { err := errors.New("No address assigned for endpoint") @@ -311,7 +313,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network endpoint.Spec.IPNetworks = append(endpoint.Spec.IPNetworks, addr.String()) } - pools, err := d.client.IPPools().List(ctx, options.ListOptions{}) + getPoolsCtx, cancelListPoolsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelListPoolsCtx() + pools, err := d.client.IPPools().List(getPoolsCtx, options.ListOptions{}) if err != nil { log.Errorf("Network %v gather error, %v", request.NetworkID, err) return nil, err @@ -337,7 +341,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, networkName) // Check if exists - if _, err := d.client.Profiles().Get(ctx, networkName, options.GetOptions{}); err != nil { + getProfileCtx, cancelGetProfileCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelGetProfileCtx() + if _, err := d.client.Profiles().Get(getProfileCtx, networkName, options.GetOptions{}); err != nil { // If a profile for the network name doesn't exist then it needs to be created. // We always attempt to create the profile and rely on the datastore to reject // the request if the profile already exists. @@ -353,7 +359,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network }}}, }, } - if _, err := d.client.Profiles().Create(ctx, profile, options.SetOptions{}); err != nil { + createProfileCtx, cancelCreateProfileCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelCreateProfileCtx() + if _, err := d.client.Profiles().Create(createProfileCtx, profile, options.SetOptions{}); err != nil { if _, ok := err.(libcalicoErrors.ErrorResourceAlreadyExists); !ok { log.Errorln(err) return nil, err @@ -363,7 +371,9 @@ func (d Driver) CreateEndpoint(request *network.CreateEndpointRequest) (*network } // Create the endpoint last to minimize side-effects if something goes wrong. - endpoint, err = d.client.WorkloadEndpoints().Create(ctx, endpoint, options.SetOptions{}) + createWepCtx, cancelCreateWepCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelCreateWepCtx() + endpoint, err = d.client.WorkloadEndpoints().Create(createWepCtx, endpoint, options.SetOptions{}) if err != nil { log.Errorf("Workload endpoints creation error, data: %+v, %v", endpoint, err) return nil, err @@ -389,8 +399,10 @@ func (d Driver) DeleteEndpoint(request *network.DeleteEndpointRequest) error { return err } + deleteWepCtx, cancelDeleteWepCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelDeleteWepCtx() if _, err = d.client.WorkloadEndpoints().Delete( - context.Background(), d.namespace, + deleteWepCtx, d.namespace, wepName, options.DeleteOptions{}); err != nil { log.Errorf("Endpoint %v removal error, %v", request.EndpointID, err) return err @@ -405,7 +417,6 @@ func (d Driver) EndpointInfo(request *network.InfoRequest) (*network.InfoRespons // Join . func (d Driver) Join(request *network.JoinRequest) (*network.JoinResponse, error) { - ctx := context.Background() // 1) Set up a veth pair // The one end will stay in the host network namespace - named caliXXXXX // The other end is given a temporary name. It's moved into the final network namespace by libnetwork itself. @@ -429,7 +440,10 @@ func (d Driver) Join(request *network.JoinRequest) (*network.JoinResponse, error log.Errorln(err) return nil, err } - wep, err := weps.Get(ctx, d.namespace, wepName, options.GetOptions{}) + + getWepsCtx, cancelGetWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelGetWepsCtx() + wep, err := weps.Get(getWepsCtx, d.namespace, wepName, options.GetOptions{}) if err != nil { log.Errorln(err) return nil, err @@ -440,7 +454,10 @@ func (d Driver) Join(request *network.JoinRequest) (*network.JoinResponse, error return nil, err } wep.Spec.MAC = tempNIC.Attrs().HardwareAddr.String() - _, err = weps.Update(ctx, wep, options.SetOptions{}) + + updateWepsCtx, cancelUpdateWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelUpdateWepsCtx() + _, err = weps.Update(updateWepsCtx, wep, options.SetOptions{}) if err != nil { log.Errorln(err) return nil, err @@ -494,7 +511,9 @@ func (d Driver) FindPoolByNetworkID(networkID string) (*api.IPPool, error) { err error ) - if pools, err = d.client.IPPools().List(context.Background(), options.ListOptions{}); err != nil { + listPoolsCtx, cancelListPoolsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelListPoolsCtx() + if pools, err = d.client.IPPools().List(listPoolsCtx, options.ListOptions{}); err != nil { log.Errorf("[calico.NetworkDriver::FindPoolByNetworkID] Network %v gather error, %v", networkID, err) return nil, err } @@ -542,8 +561,6 @@ func (d Driver) RevokeExternalConnectivity(*network.RevokeExternalConnectivityRe // container list in the NetworkInspect and make the Container available // for inspecting. func (d Driver) populateWorkloadEndpointWithLabels(request *network.CreateEndpointRequest, endpoint *api.WorkloadEndpoint) { - ctx := context.Background() - networkID := request.NetworkID endpointID := request.EndpointID @@ -568,7 +585,9 @@ RETRY_NETWORK_INSPECT: } // inspect our custom network - networkData, err := d.dockerCli.NetworkInspect(ctx, networkID, dockerTypes.NetworkInspectOptions{}) + inspectNetworkCtx, cancelInspectNetworkCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelInspectNetworkCtx() + networkData, err := d.dockerCli.NetworkInspect(inspectNetworkCtx, networkID, dockerTypes.NetworkInspectOptions{}) if err != nil { err = errors.Annotatef(err, "Error inspecting network %s - retrying (T=%s)", networkID, time.Since(start)) log.Warningln(err) @@ -616,7 +635,9 @@ RETRY_CONTAINER_INSPECT: return } - containerInfo, err := d.dockerCli.ContainerInspect(ctx, containerID) + inspectContainerCtx, cancelInspectContainerCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelInspectContainerCtx() + containerInfo, err := d.dockerCli.ContainerInspect(inspectContainerCtx, containerID) if err != nil { err = errors.Annotatef(err, "Error inspecting container %s for labels - retrying (T=%s)", containerID, time.Since(start)) log.Warningln(err) @@ -655,11 +676,16 @@ RETRY_UPDATE_ENDPOINT: } // lets update the workloadEndpoint - _, err = d.client.WorkloadEndpoints().Update(ctx, endpoint, options.SetOptions{}) + updateWepsCtx, cancelUpdateWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelUpdateWepsCtx() + _, err = d.client.WorkloadEndpoints().Update(updateWepsCtx, endpoint, options.SetOptions{}) if err != nil { err = errors.Annotatef(err, "Unable to update WorkloadEndpoint with labels (T=%s)", time.Since(start)) log.Warningln(err) - endpoint, err = d.client.WorkloadEndpoints().Get(ctx, endpoint.Namespace, endpoint.Name, options.GetOptions{}) + + getWepsCtx, cancelGetWepsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelGetWepsCtx() + endpoint, err = d.client.WorkloadEndpoints().Get(getWepsCtx, endpoint.Namespace, endpoint.Name, options.GetOptions{}) if err != nil { err = errors.Annotatef(err, "Unable to get WorkloadEndpoint (T=%s)", time.Since(start)) log.Errorln(err) @@ -684,9 +710,11 @@ func (d Driver) generateEndpointName(hostname, endpointID string) (string, error } func (d Driver) populatePoolLabel(pools []string, networkID string) error { - ctx := context.Background() poolClient := d.client.IPPools() - ipPools, err := poolClient.List(ctx, options.ListOptions{}) + + listPoolsCtx, cancelListPoolsCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelListPoolsCtx() + ipPools, err := poolClient.List(listPoolsCtx, options.ListOptions{}) if err != nil { log.Errorln(err) return err @@ -701,7 +729,9 @@ func (d Driver) populatePoolLabel(pools []string, networkID string) error { ann[dockerLabelPrefix+"network.ID"] = networkID ipPool.SetAnnotations(ann) // TODO need remove nolint and use unittest to cover this case - if _, err = poolClient.Update(ctx, &ipPool, options.SetOptions{}); err != nil { // nolint + updatePoolCtx, cancelUpdatePoolCtx := context.WithTimeout(context.Background(), d.requestTimeout) + defer cancelUpdatePoolCtx() + if _, err = poolClient.Update(updatePoolCtx, &ipPool, options.SetOptions{}); err != nil { // nolint log.Errorln(err) return err } diff --git a/driver/fixedip/ipam.go b/driver/fixedip/ipam.go index 3500260..7f894e8 100644 --- a/driver/fixedip/ipam.go +++ b/driver/fixedip/ipam.go @@ -2,6 +2,7 @@ package fixedip import ( "context" + "time" calicoDriver "github.com/projecteru2/barrel/driver/calico" "github.com/projecteru2/barrel/types" @@ -16,16 +17,19 @@ type Ipam struct { calicoDriver.Ipam utils.LoggerFactory vessel.FixedIPAllocator + requestTimeout time.Duration } // NewIpam . func NewIpam( allocator vessel.FixedIPAllocator, + requestTimeout time.Duration, ) pluginIpam.Ipam { return Ipam{ - Ipam: calicoDriver.NewIpam(allocator), + Ipam: calicoDriver.NewIpam(allocator, requestTimeout), LoggerFactory: utils.NewObjectLogger("FixedIPIpam"), FixedIPAllocator: allocator, + requestTimeout: requestTimeout, } } @@ -43,8 +47,10 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu return ipam.Ipam.RequestAddress(request) } + ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout) + defer cancel() if err := ipam.AssignFixedIP( - context.Background(), + ctx, types.IP{ PoolID: request.PoolID, Address: request.Address, @@ -64,8 +70,10 @@ func (ipam Ipam) RequestAddress(request *pluginIpam.RequestAddressRequest) (*plu // ReleaseAddress . func (ipam Ipam) ReleaseAddress(request *pluginIpam.ReleaseAddressRequest) error { + ctx, cancel := context.WithTimeout(context.Background(), ipam.requestTimeout) + defer cancel() if err := ipam.UnassignFixedIP( - context.Background(), + ctx, types.IP{ PoolID: request.PoolID, Address: request.Address, diff --git a/driver/fixedip/network.go b/driver/fixedip/network.go index f157d82..e25e275 100644 --- a/driver/fixedip/network.go +++ b/driver/fixedip/network.go @@ -1,6 +1,8 @@ package fixedip import ( + "time" + dockerClient "github.com/docker/docker/client" "github.com/docker/go-plugins-helpers/network" "github.com/projectcalico/libcalico-go/lib/clientv3" @@ -21,9 +23,10 @@ func NewDriver( dockerCli *dockerClient.Client, agent vessel.CNMAgent, hostname string, + requestTimeout time.Duration, ) Driver { return Driver{ - Driver: calicoDriver.NewDriver(client, dockerCli, hostname), + Driver: calicoDriver.NewDriver(client, dockerCli, hostname, requestTimeout), agent: agent, } } diff --git a/vessel/calicoippool.go b/vessel/calicoippool.go index c462fa8..4a1976a 100644 --- a/vessel/calicoippool.go +++ b/vessel/calicoippool.go @@ -126,7 +126,7 @@ func (m manager) AllocIPFromPool(ctx context.Context, poolID string) (types.IPAd var IPsV4 []caliconet.IPNet var IPsV6 []caliconet.IPNet if IPsV4, IPsV6, err = m.cliv3.IPAM().AutoAssign( - context.Background(), + ctx, calicoipam.AutoAssignArgs{ Num4: numIPv4, Num6: numIPv6, diff --git a/vessel/helper.go b/vessel/helper.go index 6e63407..13b6585 100644 --- a/vessel/helper.go +++ b/vessel/helper.go @@ -78,7 +78,7 @@ func (helper Helper) ReleaseContainerAddresses(ctx context.Context, containerID logger.Infof("Release reserved IP by tied containerID(%s)", containerID) container := types.ContainerInfo{ID: containerID, HostName: helper.Hostname()} - if present, err := helper.GetAndDelete(context.Background(), &codec.ContainerInfoCodec{Info: &container}); err != nil { + if present, err := helper.GetAndDelete(ctx, &codec.ContainerInfoCodec{Info: &container}); err != nil { return err } else if !present { logger.Infof("the container(%s) is not exists, will do nothing\n", containerID)