diff --git a/README.md b/README.md index e0705bc..022cff3 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,7 @@ Flags: --kube-context string name of the kubeconfig context to use. Overrides $ORCA_KUBE_CONTEXT -n, --name string name of environment (namespace) to deploy to. Overrides $ORCA_NAME --override strings chart to override with different version (can specify multiple): chart=version + -p, --parallel int number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL (default 1) --repo string chart repository (name=url). Overrides $ORCA_REPO -s, --set strings set additional parameters --tls enable TLS for request. Overrides $ORCA_TLS @@ -301,6 +302,7 @@ Flags: --helm-tls-store string path to TLS certs and keys. Overrides $HELM_TLS_STORE --kube-context string name of the kubeconfig context to use. Overrides $ORCA_KUBE_CONTEXT -n, --name string name of environment (namespace) to delete. Overrides $ORCA_NAME + -p, --parallel int number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL (default 1) --tls enable TLS for request. Overrides $ORCA_TLS ``` diff --git a/pkg/orca/env.go b/pkg/orca/env.go index 8ae8ea9..f914f94 100644 --- a/pkg/orca/env.go +++ b/pkg/orca/env.go @@ -28,6 +28,7 @@ type envCmd struct { inject bool force bool deployOnlyOverrideIfEnvExists bool + parallel int out io.Writer } @@ -132,12 +133,12 @@ func NewDeployEnvCmd(out io.Writer) *cobra.Command { installedReleases := utils.GetInstalledReleases(e.kubeContext, e.name, includeFailed) releasesToInstall := utils.GetReleasesDelta(desiredReleases, installedReleases) - utils.DeployChartsFromRepository(releasesToInstall, e.kubeContext, e.name, e.repo, e.helmTLSStore, e.tls, e.packedValues, e.set, e.inject) + utils.DeployChartsFromRepository(releasesToInstall, e.kubeContext, e.name, e.repo, e.helmTLSStore, e.tls, e.packedValues, e.set, e.inject, e.parallel) if !e.deployOnlyOverrideIfEnvExists { installedReleases = utils.GetInstalledReleases(e.kubeContext, e.name, includeFailed) releasesToDelete := utils.GetReleasesDelta(installedReleases, desiredReleases) - utils.DeleteReleases(releasesToDelete, e.kubeContext, e.helmTLSStore, e.tls) + utils.DeleteReleases(releasesToDelete, e.kubeContext, e.helmTLSStore, e.tls, e.parallel) } unlockEnvironment(e.name, e.kubeContext, print) }, @@ -156,6 +157,7 @@ func NewDeployEnvCmd(out io.Writer) *cobra.Command { f.StringVar(&e.helmTLSStore, "helm-tls-store", os.Getenv("HELM_TLS_STORE"), "path to TLS certs and keys. Overrides $HELM_TLS_STORE") f.BoolVar(&e.inject, "inject", utils.GetBoolEnvVar("ORCA_INJECT", false), "enable injection during helm upgrade. Overrides $ORCA_INJECT (requires helm inject plugin: https://github.com/maorfr/helm-inject)") f.BoolVarP(&e.deployOnlyOverrideIfEnvExists, "deploy-only-override-if-env-exists", "x", false, "if environment exists - deploy only override(s) (support for features spanning multiple services). Overrides $ORCA_DEPLOY_ONLY_OVERRIDE_IF_ENV_EXISTS") + f.IntVarP(&e.parallel, "parallel", "p", utils.GetIntEnvVar("ORCA_PARALLEL", 1), "number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL") f.BoolVar(&e.createNS, "create-ns", utils.GetBoolEnvVar("ORCA_CREATE_NS", false), "should create new namespace. Overrides $ORCA_CREATE_NS") f.MarkDeprecated("create-ns", "namespace will be created if it does not exist") @@ -188,7 +190,7 @@ func NewDeleteEnvCmd(out io.Writer) *cobra.Command { includeFailed := true markEnvironmentForDeletion(e.name, e.kubeContext, e.force, print) releases := utils.GetInstalledReleases(e.kubeContext, e.name, includeFailed) - utils.DeleteReleases(releases, e.kubeContext, e.helmTLSStore, e.tls) + utils.DeleteReleases(releases, e.kubeContext, e.helmTLSStore, e.tls, e.parallel) utils.DeleteNamespace(e.name, e.kubeContext, print) log.Printf("deleted environment \"%s\"", e.name) }, @@ -201,6 +203,7 @@ func NewDeleteEnvCmd(out io.Writer) *cobra.Command { f.BoolVar(&e.tls, "tls", utils.GetBoolEnvVar("ORCA_TLS", false), "enable TLS for request. Overrides $ORCA_TLS") f.StringVar(&e.helmTLSStore, "helm-tls-store", os.Getenv("HELM_TLS_STORE"), "path to TLS certs and keys. Overrides $HELM_TLS_STORE") f.BoolVar(&e.force, "force", utils.GetBoolEnvVar("ORCA_FORCE", false), "force environment deletion. Overrides $ORCA_FORCE") + f.IntVarP(&e.parallel, "parallel", "p", utils.GetIntEnvVar("ORCA_PARALLEL", 1), "number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL") return cmd } diff --git a/pkg/utils/bwg.go b/pkg/utils/bwg.go new file mode 100644 index 0000000..faeb919 --- /dev/null +++ b/pkg/utils/bwg.go @@ -0,0 +1,32 @@ +package utils + +import ( + "sync" +) + +type BoundedWaitGroup struct { + wg sync.WaitGroup + ch chan struct{} +} + +func NewBoundedWaitGroup(cap int) BoundedWaitGroup { + return BoundedWaitGroup{ch: make(chan struct{}, cap)} +} + +func (bwg *BoundedWaitGroup) Add(delta int) { + for i := 0; i > delta; i-- { + <-bwg.ch + } + for i := 0; i < delta; i++ { + bwg.ch <- struct{}{} + } + bwg.wg.Add(delta) +} + +func (bwg *BoundedWaitGroup) Done() { + bwg.Add(-1) +} + +func (bwg *BoundedWaitGroup) Wait() { + bwg.wg.Wait() +} diff --git a/pkg/utils/helm.go b/pkg/utils/helm.go index dd2097a..89abd56 100644 --- a/pkg/utils/helm.go +++ b/pkg/utils/helm.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "log" + "math" "os" "strings" "sync" @@ -10,19 +11,24 @@ import ( ) // DeployChartsFromRepository deploys a list of Helm charts from a repository in parallel -func DeployChartsFromRepository(releasesToInstall []ReleaseSpec, kubeContext, namespace, repo, helmTLSStore string, tls bool, packedValues, set []string, inject bool) { +func DeployChartsFromRepository(releasesToInstall []ReleaseSpec, kubeContext, namespace, repo, helmTLSStore string, tls bool, packedValues, set []string, inject bool, parallel int) { + totalReleases := len(releasesToInstall) + if parallel == 0 { + parallel = totalReleases + } + bwgSize := int(math.Min(float64(parallel), float64(totalReleases))) // Very stingy :) + bwg := NewBoundedWaitGroup(bwgSize) var mutex = &sync.Mutex{} - var wg sync.WaitGroup for len(releasesToInstall) > 0 { mutex.Lock() for _, c := range releasesToInstall { - wg.Add(1) + bwg.Add(1) go func(c ReleaseSpec) { - defer wg.Done() + defer bwg.Done() // If there are (still) any dependencies - leave this chart for a later iteration if len(c.Dependencies) != 0 { @@ -56,7 +62,7 @@ func DeployChartsFromRepository(releasesToInstall []ReleaseSpec, kubeContext, na mutex.Unlock() time.Sleep(5 * time.Second) } - wg.Wait() + bwg.Wait() } // DeployChartFromRepository deploys a Helm chart from a chart repository @@ -216,19 +222,24 @@ func UpgradeRelease(name, releaseName, kubeContext, namespace, values, set strin } // DeleteReleases deletes a list of releases in parallel -func DeleteReleases(releasesToDelete []ReleaseSpec, kubeContext, helmTLSStore string, tls bool) { - var wg sync.WaitGroup +func DeleteReleases(releasesToDelete []ReleaseSpec, kubeContext, helmTLSStore string, tls bool, parallel int) { + totalReleases := len(releasesToDelete) + if parallel == 0 { + parallel = totalReleases + } + bwgSize := int(math.Min(float64(parallel), float64(totalReleases))) // Very stingy :) + bwg := NewBoundedWaitGroup(bwgSize) for _, c := range releasesToDelete { - wg.Add(1) + bwg.Add(1) go func(c ReleaseSpec) { - defer wg.Done() + defer bwg.Done() log.Println("deleting", c.ReleaseName) DeleteRelease(c.ReleaseName, kubeContext, tls, helmTLSStore, false) log.Println("deleted", c.ReleaseName) }(c) } - wg.Wait() + bwg.Wait() } // DeleteRelease deletes a release from Kubernetes