Skip to content

Commit

Permalink
wg to bwg
Browse files Browse the repository at this point in the history
  • Loading branch information
maorfr committed Oct 17, 2018
1 parent 9958369 commit 60e79af
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ Flags:
--kube-context string name of the kubeconfig context to use. Overrides $ORCA_KUBE_CONTEXT
-n, --name string name of environment (namespace) to deploy to. Overrides $ORCA_NAME
--override strings chart to override with different version (can specify multiple): chart=version
-p, --parallel int number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL (default 1)
--repo string chart repository (name=url). Overrides $ORCA_REPO
-s, --set strings set additional parameters
--tls enable TLS for request. Overrides $ORCA_TLS
Expand All @@ -301,6 +302,7 @@ Flags:
--helm-tls-store string path to TLS certs and keys. Overrides $HELM_TLS_STORE
--kube-context string name of the kubeconfig context to use. Overrides $ORCA_KUBE_CONTEXT
-n, --name string name of environment (namespace) to delete. Overrides $ORCA_NAME
-p, --parallel int number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL (default 1)
--tls enable TLS for request. Overrides $ORCA_TLS
```

Expand Down
9 changes: 6 additions & 3 deletions pkg/orca/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type envCmd struct {
inject bool
force bool
deployOnlyOverrideIfEnvExists bool
parallel int

out io.Writer
}
Expand Down Expand Up @@ -132,12 +133,12 @@ func NewDeployEnvCmd(out io.Writer) *cobra.Command {
installedReleases := utils.GetInstalledReleases(e.kubeContext, e.name, includeFailed)
releasesToInstall := utils.GetReleasesDelta(desiredReleases, installedReleases)

utils.DeployChartsFromRepository(releasesToInstall, e.kubeContext, e.name, e.repo, e.helmTLSStore, e.tls, e.packedValues, e.set, e.inject)
utils.DeployChartsFromRepository(releasesToInstall, e.kubeContext, e.name, e.repo, e.helmTLSStore, e.tls, e.packedValues, e.set, e.inject, e.parallel)

if !e.deployOnlyOverrideIfEnvExists {
installedReleases = utils.GetInstalledReleases(e.kubeContext, e.name, includeFailed)
releasesToDelete := utils.GetReleasesDelta(installedReleases, desiredReleases)
utils.DeleteReleases(releasesToDelete, e.kubeContext, e.helmTLSStore, e.tls)
utils.DeleteReleases(releasesToDelete, e.kubeContext, e.helmTLSStore, e.tls, e.parallel)
}
unlockEnvironment(e.name, e.kubeContext, print)
},
Expand All @@ -156,6 +157,7 @@ func NewDeployEnvCmd(out io.Writer) *cobra.Command {
f.StringVar(&e.helmTLSStore, "helm-tls-store", os.Getenv("HELM_TLS_STORE"), "path to TLS certs and keys. Overrides $HELM_TLS_STORE")
f.BoolVar(&e.inject, "inject", utils.GetBoolEnvVar("ORCA_INJECT", false), "enable injection during helm upgrade. Overrides $ORCA_INJECT (requires helm inject plugin: https://github.com/maorfr/helm-inject)")
f.BoolVarP(&e.deployOnlyOverrideIfEnvExists, "deploy-only-override-if-env-exists", "x", false, "if environment exists - deploy only override(s) (support for features spanning multiple services). Overrides $ORCA_DEPLOY_ONLY_OVERRIDE_IF_ENV_EXISTS")
f.IntVarP(&e.parallel, "parallel", "p", utils.GetIntEnvVar("ORCA_PARALLEL", 1), "number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL")

f.BoolVar(&e.createNS, "create-ns", utils.GetBoolEnvVar("ORCA_CREATE_NS", false), "should create new namespace. Overrides $ORCA_CREATE_NS")
f.MarkDeprecated("create-ns", "namespace will be created if it does not exist")
Expand Down Expand Up @@ -188,7 +190,7 @@ func NewDeleteEnvCmd(out io.Writer) *cobra.Command {
includeFailed := true
markEnvironmentForDeletion(e.name, e.kubeContext, e.force, print)
releases := utils.GetInstalledReleases(e.kubeContext, e.name, includeFailed)
utils.DeleteReleases(releases, e.kubeContext, e.helmTLSStore, e.tls)
utils.DeleteReleases(releases, e.kubeContext, e.helmTLSStore, e.tls, e.parallel)
utils.DeleteNamespace(e.name, e.kubeContext, print)
log.Printf("deleted environment \"%s\"", e.name)
},
Expand All @@ -201,6 +203,7 @@ func NewDeleteEnvCmd(out io.Writer) *cobra.Command {
f.BoolVar(&e.tls, "tls", utils.GetBoolEnvVar("ORCA_TLS", false), "enable TLS for request. Overrides $ORCA_TLS")
f.StringVar(&e.helmTLSStore, "helm-tls-store", os.Getenv("HELM_TLS_STORE"), "path to TLS certs and keys. Overrides $HELM_TLS_STORE")
f.BoolVar(&e.force, "force", utils.GetBoolEnvVar("ORCA_FORCE", false), "force environment deletion. Overrides $ORCA_FORCE")
f.IntVarP(&e.parallel, "parallel", "p", utils.GetIntEnvVar("ORCA_PARALLEL", 1), "number of releases to act on in parallel. set this flag to 0 for full parallelism. Overrides $ORCA_PARALLEL")

return cmd
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/utils/bwg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package utils

import (
"sync"
)

type BoundedWaitGroup struct {
wg sync.WaitGroup
ch chan struct{}
}

func NewBoundedWaitGroup(cap int) BoundedWaitGroup {
return BoundedWaitGroup{ch: make(chan struct{}, cap)}
}

func (bwg *BoundedWaitGroup) Add(delta int) {
for i := 0; i > delta; i-- {
<-bwg.ch
}
for i := 0; i < delta; i++ {
bwg.ch <- struct{}{}
}
bwg.wg.Add(delta)
}

func (bwg *BoundedWaitGroup) Done() {
bwg.Add(-1)
}

func (bwg *BoundedWaitGroup) Wait() {
bwg.wg.Wait()
}
31 changes: 21 additions & 10 deletions pkg/utils/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,32 @@ package utils
import (
"fmt"
"log"
"math"
"os"
"strings"
"sync"
"time"
)

// DeployChartsFromRepository deploys a list of Helm charts from a repository in parallel
func DeployChartsFromRepository(releasesToInstall []ReleaseSpec, kubeContext, namespace, repo, helmTLSStore string, tls bool, packedValues, set []string, inject bool) {
func DeployChartsFromRepository(releasesToInstall []ReleaseSpec, kubeContext, namespace, repo, helmTLSStore string, tls bool, packedValues, set []string, inject bool, parallel int) {

totalReleases := len(releasesToInstall)
if parallel == 0 {
parallel = totalReleases
}
bwgSize := int(math.Min(float64(parallel), float64(totalReleases))) // Very stingy :)
bwg := NewBoundedWaitGroup(bwgSize)
var mutex = &sync.Mutex{}
var wg sync.WaitGroup

for len(releasesToInstall) > 0 {

mutex.Lock()
for _, c := range releasesToInstall {

wg.Add(1)
bwg.Add(1)
go func(c ReleaseSpec) {
defer wg.Done()
defer bwg.Done()

// If there are (still) any dependencies - leave this chart for a later iteration
if len(c.Dependencies) != 0 {
Expand Down Expand Up @@ -56,7 +62,7 @@ func DeployChartsFromRepository(releasesToInstall []ReleaseSpec, kubeContext, na
mutex.Unlock()
time.Sleep(5 * time.Second)
}
wg.Wait()
bwg.Wait()
}

// DeployChartFromRepository deploys a Helm chart from a chart repository
Expand Down Expand Up @@ -216,19 +222,24 @@ func UpgradeRelease(name, releaseName, kubeContext, namespace, values, set strin
}

// DeleteReleases deletes a list of releases in parallel
func DeleteReleases(releasesToDelete []ReleaseSpec, kubeContext, helmTLSStore string, tls bool) {
var wg sync.WaitGroup
func DeleteReleases(releasesToDelete []ReleaseSpec, kubeContext, helmTLSStore string, tls bool, parallel int) {
totalReleases := len(releasesToDelete)
if parallel == 0 {
parallel = totalReleases
}
bwgSize := int(math.Min(float64(parallel), float64(totalReleases))) // Very stingy :)
bwg := NewBoundedWaitGroup(bwgSize)

for _, c := range releasesToDelete {
wg.Add(1)
bwg.Add(1)
go func(c ReleaseSpec) {
defer wg.Done()
defer bwg.Done()
log.Println("deleting", c.ReleaseName)
DeleteRelease(c.ReleaseName, kubeContext, tls, helmTLSStore, false)
log.Println("deleted", c.ReleaseName)
}(c)
}
wg.Wait()
bwg.Wait()
}

// DeleteRelease deletes a release from Kubernetes
Expand Down

0 comments on commit 60e79af

Please sign in to comment.