Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the concurrency of AWS calls to avoid throttling #12

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/ecs-exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
)

const (
defaultListenAddress = ":9222"
defaultAwsRegion = ""
defaultMetricsPath = "/metrics"
defaultClusterFilter = ".*"
defaultConcurrency = 5
defaultDebug = false
defaultDisableCIMetrics = false
defaultListenAddress = ":9222"
defaultMetricsPath = "/metrics"
)

// Cfg is the global configuration
Expand All @@ -32,6 +33,7 @@ type config struct {

listenAddress string
awsRegion string
maxConcurrency int
metricsPath string
clusterFilter string
debug bool
Expand All @@ -55,6 +57,9 @@ func new() *config {
c.fs.StringVar(
&c.awsRegion, "aws.region", defaultAwsRegion, "The AWS region to get metrics from")

c.fs.IntVar(
&c.maxConcurrency, "concurrency", defaultConcurrency, "Max concurrency for API calls to AWS")

c.fs.StringVar(
&c.clusterFilter, "aws.cluster-filter", defaultClusterFilter, "Regex used to filter the cluster names, if doesn't match the cluster is ignored")

Expand Down
2 changes: 1 addition & 1 deletion cmd/ecs-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Main() int {
}

// Create the exporter and register it
exporter, err := collector.New(cfg.awsRegion, cfg.clusterFilter, cfg.disableCIMetrics)
exporter, err := collector.New(cfg.awsRegion, cfg.clusterFilter, cfg.maxConcurrency, cfg.disableCIMetrics)
if err != nil {
log.Error(err)
return 1
Expand Down
71 changes: 36 additions & 35 deletions collector/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ type ECSGatherer interface {

// ECSClient is a wrapper for AWS ecs client that implements helpers to get ECS clusters metrics
type ECSClient struct {
client ecsiface.ECSAPI
apiMaxResults int64
client ecsiface.ECSAPI
apiMaxResults int64
maxConcurrency int
}

// NewECSClient will return an initialized ECSClient
func NewECSClient(awsRegion string) (*ECSClient, error) {
func NewECSClient(awsRegion string, maxConcurrency int) (*ECSClient, error) {
// Create AWS session
s := session.New(&aws.Config{Region: aws.String(awsRegion)})
if s == nil {
return nil, fmt.Errorf("error creating aws session")
}

return &ECSClient{
client: ecs.New(s),
apiMaxResults: 100,
client: ecs.New(s),
apiMaxResults: 100,
maxConcurrency: maxConcurrency,
}, nil
}

Expand Down Expand Up @@ -135,7 +137,7 @@ func (e *ECSClient) GetClusterServices(cluster *types.ECSCluster) ([]*types.ECSS
return res, nil
}

servC := make(chan srvRes)
servC := make(chan srvRes, e.maxConcurrency)

// Only can grab 10 services at a time, create calls in blocks of 10 services
totalGr := 0 // counter for goroutines
Expand All @@ -154,35 +156,7 @@ func (e *ECSClient) GetClusterServices(cluster *types.ECSCluster) ([]*types.ECSS
}

totalGr++
// Make a call on goroutine for each service blocks
go func(services []*string) {
log.Debugf("Getting service descriptions for cluster: %s", cluster.Name)
params := &ecs.DescribeServicesInput{
Services: services,
Cluster: aws.String(cluster.ID),
}
resp, err := e.client.DescribeServices(params)
if err != nil {
servC <- srvRes{nil, err}
}

ss := []*types.ECSService{}

for _, s := range resp.Services {
es := &types.ECSService{
ID: aws.StringValue(s.ServiceArn),
Name: aws.StringValue(s.ServiceName),
DesiredT: aws.Int64Value(s.DesiredCount),
RunningT: aws.Int64Value(s.RunningCount),
PendingT: aws.Int64Value(s.PendingCount),
}
ss = append(ss, es)
}

servC <- srvRes{ss, nil}

}(spss)

go e.callAWSAPI(servC, cluster, spss)
}

// Get all results
Expand All @@ -198,6 +172,33 @@ func (e *ECSClient) GetClusterServices(cluster *types.ECSCluster) ([]*types.ECSS
return res, nil
}

func (e *ECSClient) callAWSAPI(results chan srvRes, cluster *types.ECSCluster, services []*string) {
ss := []*types.ECSService{}
params := &ecs.DescribeServicesInput{
Services: services,
Cluster: aws.String(cluster.ID),
}

resp, err := e.client.DescribeServices(params)
if err != nil {
results <- srvRes{nil, err}
log.Debugf("Added error to the channel")
}

for _, s := range resp.Services {
es := &types.ECSService{
ID: aws.StringValue(s.ServiceArn),
Name: aws.StringValue(s.ServiceName),
DesiredT: aws.Int64Value(s.DesiredCount),
RunningT: aws.Int64Value(s.RunningCount),
PendingT: aws.Int64Value(s.PendingCount),
}
ss = append(ss, es)
}
results <- srvRes{ss, nil}
log.Debugf("Added %s results to the channel", len(ss))
}

// GetClusterContainerInstances will return all the container instances from a cluster
func (e *ECSClient) GetClusterContainerInstances(cluster *types.ECSCluster) ([]*types.ECSContainerInstance, error) {

Expand Down
19 changes: 10 additions & 9 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,18 @@ var (

// Exporter collects ECS clusters metrics
type Exporter struct {
sync.Mutex // Our exporter object will be locakble to protect from concurrent scrapes
client ECSGatherer // Custom ECS client to get information from the clusters
region string // The region where the exporter will scrape
clusterFilter *regexp.Regexp // Compiled regular expresion to filter clusters
noCIMetrics bool // Don't gather container instance metrics
timeout time.Duration // The timeout for the whole gathering process
sync.Mutex // Our exporter object will be locakble to protect from concurrent scrapes
client ECSGatherer // Custom ECS client to get information from the clusters
region string // The region where the exporter will scrape
clusterFilter *regexp.Regexp // Compiled regular expresion to filter clusters
noCIMetrics bool // Don't gather container instance metrics
timeout time.Duration // The timeout for the whole gathering process
maxConcurrency int // The Maximum concurrency allowed for AWS operations
}

// New returns an initialized exporter
func New(awsRegion string, clusterFilterRegexp string, disableCIMetrics bool) (*Exporter, error) {
c, err := NewECSClient(awsRegion)
func New(awsRegion string, clusterFilterRegexp string, maxConcurrency int, disableCIMetrics bool) (*Exporter, error) {
c, err := NewECSClient(awsRegion, maxConcurrency)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +178,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.collectClusterMetrics(ctx, ch, cs)

// Start getting metrics per cluster on its own goroutine
errC := make(chan bool)
errC := make(chan bool, e.maxConcurrency)
totalCs := 0 // total cluster metrics gorotine ran

for _, c := range cs {
Expand Down
6 changes: 3 additions & 3 deletions collector/collector_i_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestCollectError(t *testing.T) {
},
}

exp, err := New("eu-west-1", "", false)
exp, err := New("eu-west-1", "", 1, false)
if err != nil {
t.Errorf("Creation of exporter shouldn't error: %v", err)
}
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestCollectOk(t *testing.T) {
cid: test.cCInstances,
}

exp, err := New("eu-west-1", test.cFilter, test.disableCIM)
exp, err := New("eu-west-1", test.cFilter, 1, test.disableCIM)
if err != nil {
t.Errorf("Creation of exporter shouldn't error: %v", err)
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestCollectTimeoutNoPanic(t *testing.T) {
sleepFor: 10 * time.Millisecond,
}

exp, err := New("eu-west-1", ".*", false)
exp, err := New("eu-west-1", ".*", 1, false)
if err != nil {
t.Errorf("Creation of exporter shouldn't error: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func readGauge(g prometheus.Metric) metricResult {

func TestCollectClusterMetrics(t *testing.T) {
region := "eu-west-1"
exp, err := New(region, "", false)
exp, err := New(region, "", 1, false)
if err != nil {
t.Errorf("Creation of exporter shoudnt error: %v", err)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestCollectClusterMetrics(t *testing.T) {

func TestCollectClusterServiceMetrics(t *testing.T) {
region := "eu-west-1"
exp, err := New(region, "", false)
exp, err := New(region, "", 1, false)
if err != nil {
t.Errorf("Creation of exporter shouldnt error: %v", err)
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestCollectClusterServiceMetrics(t *testing.T) {

func TestCollectClusterContainerInstanceMetrics(t *testing.T) {
region := "eu-west-1"
exp, err := New(region, "", false)
exp, err := New(region, "", 1, false)
if err != nil {
t.Errorf("Creation of exporter shouldnt error: %v", err)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestValidClusters(t *testing.T) {
}

for _, test := range tests {
e, err := New("eu-west-1", test.filter, false)
e, err := New("eu-west-1", test.filter, 1, false)
if err != nil {
t.Errorf("Creation of exporter shoudn't error: %v", err)
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestCollectClusterMetricsTimeout(t *testing.T) {
}
}()

exp, _ := New("eu-west-1", "", false)
exp, _ := New("eu-west-1", "", 1, false)
ch := make(chan prometheus.Metric)
close(ch)

Expand All @@ -328,7 +328,7 @@ func TestCollectClusterServiceMetricsTimeout(t *testing.T) {
}
}()

exp, _ := New("eu-west-1", "", false)
exp, _ := New("eu-west-1", "", 1, false)
ch := make(chan prometheus.Metric)
close(ch)

Expand All @@ -348,7 +348,7 @@ func TestCollectContainerInstanceMetricsTimeout(t *testing.T) {
}
}()

exp, _ := New("eu-west-1", "", false)
exp, _ := New("eu-west-1", "", 1, false)
ch := make(chan prometheus.Metric)
close(ch)

Expand Down