From 166282d3e50ca9463fd3956eb914c8948406db1f Mon Sep 17 00:00:00 2001 From: KensoDev Date: Thu, 23 Aug 2018 09:30:37 -0700 Subject: [PATCH 1/2] Limit the concurrency of AWS calls to avoid throttling --- cmd/ecs-exporter/config.go | 9 +++-- cmd/ecs-exporter/main.go | 2 +- collector/aws.go | 71 +++++++++++++++++++------------------ collector/collector.go | 19 +++++----- collector/collector_test.go | 14 ++++---- 5 files changed, 61 insertions(+), 54 deletions(-) diff --git a/cmd/ecs-exporter/config.go b/cmd/ecs-exporter/config.go index cc65c79..91cbff0 100644 --- a/cmd/ecs-exporter/config.go +++ b/cmd/ecs-exporter/config.go @@ -10,12 +10,13 @@ import ( ) const ( - defaultListenAddress = ":9222" defaultAwsRegion = "" - defaultMetricsPath = "/metrics" defaultClusterFilter = ".*" + defaultConcurrency = 5 defaultDebug = false defaultDisableCIMetrics = false + defaultListenAddress = ":9222" + defaultMetricsPath = "/metrics" ) // Cfg is the global configuration @@ -32,6 +33,7 @@ type config struct { listenAddress string awsRegion string + maxConcurrency int metricsPath string clusterFilter string debug bool @@ -55,6 +57,9 @@ func new() *config { c.fs.StringVar( &c.awsRegion, "aws.region", defaultAwsRegion, "The AWS region to get metrics from") + c.fs.IntVar( + &c.maxConcurrency, "concurrency", defaultConcurrency, "Max concurrency for API calls to AWS") + c.fs.StringVar( &c.clusterFilter, "aws.cluster-filter", defaultClusterFilter, "Regex used to filter the cluster names, if doesn't match the cluster is ignored") diff --git a/cmd/ecs-exporter/main.go b/cmd/ecs-exporter/main.go index bfb76c6..b0014fb 100644 --- a/cmd/ecs-exporter/main.go +++ b/cmd/ecs-exporter/main.go @@ -31,7 +31,7 @@ func Main() int { } // Create the exporter and register it - exporter, err := collector.New(cfg.awsRegion, cfg.clusterFilter, cfg.disableCIMetrics) + exporter, err := collector.New(cfg.awsRegion, cfg.clusterFilter, cfg.maxConcurrency, cfg.disableCIMetrics) if err != nil { log.Error(err) return 1 diff --git a/collector/aws.go b/collector/aws.go index 4d7ad25..dd90e7a 100644 --- a/collector/aws.go +++ b/collector/aws.go @@ -28,12 +28,13 @@ type ECSGatherer interface { // ECSClient is a wrapper for AWS ecs client that implements helpers to get ECS clusters metrics type ECSClient struct { - client ecsiface.ECSAPI - apiMaxResults int64 + client ecsiface.ECSAPI + apiMaxResults int64 + maxConcurrency int } // NewECSClient will return an initialized ECSClient -func NewECSClient(awsRegion string) (*ECSClient, error) { +func NewECSClient(awsRegion string, maxConcurrency int) (*ECSClient, error) { // Create AWS session s := session.New(&aws.Config{Region: aws.String(awsRegion)}) if s == nil { @@ -41,8 +42,9 @@ func NewECSClient(awsRegion string) (*ECSClient, error) { } return &ECSClient{ - client: ecs.New(s), - apiMaxResults: 100, + client: ecs.New(s), + apiMaxResults: 100, + maxConcurrency: maxConcurrency, }, nil } @@ -135,7 +137,7 @@ func (e *ECSClient) GetClusterServices(cluster *types.ECSCluster) ([]*types.ECSS return res, nil } - servC := make(chan srvRes) + servC := make(chan srvRes, e.maxConcurrency) // Only can grab 10 services at a time, create calls in blocks of 10 services totalGr := 0 // counter for goroutines @@ -154,35 +156,7 @@ func (e *ECSClient) GetClusterServices(cluster *types.ECSCluster) ([]*types.ECSS } totalGr++ - // Make a call on goroutine for each service blocks - go func(services []*string) { - log.Debugf("Getting service descriptions for cluster: %s", cluster.Name) - params := &ecs.DescribeServicesInput{ - Services: services, - Cluster: aws.String(cluster.ID), - } - resp, err := e.client.DescribeServices(params) - if err != nil { - servC <- srvRes{nil, err} - } - - ss := []*types.ECSService{} - - for _, s := range resp.Services { - es := &types.ECSService{ - ID: aws.StringValue(s.ServiceArn), - Name: aws.StringValue(s.ServiceName), - DesiredT: aws.Int64Value(s.DesiredCount), - RunningT: aws.Int64Value(s.RunningCount), - PendingT: aws.Int64Value(s.PendingCount), - } - ss = append(ss, es) - } - - servC <- srvRes{ss, nil} - - }(spss) - + go e.callAWSAPI(servC, cluster, spss) } // Get all results @@ -198,6 +172,33 @@ func (e *ECSClient) GetClusterServices(cluster *types.ECSCluster) ([]*types.ECSS return res, nil } +func (e *ECSClient) callAWSAPI(results chan srvRes, cluster *types.ECSCluster, services []*string) { + ss := []*types.ECSService{} + params := &ecs.DescribeServicesInput{ + Services: services, + Cluster: aws.String(cluster.ID), + } + + resp, err := e.client.DescribeServices(params) + if err != nil { + results <- srvRes{nil, err} + log.Debugf("Added error to the channel") + } + + for _, s := range resp.Services { + es := &types.ECSService{ + ID: aws.StringValue(s.ServiceArn), + Name: aws.StringValue(s.ServiceName), + DesiredT: aws.Int64Value(s.DesiredCount), + RunningT: aws.Int64Value(s.RunningCount), + PendingT: aws.Int64Value(s.PendingCount), + } + ss = append(ss, es) + } + results <- srvRes{ss, nil} + log.Debugf("Added %s results to the channel", len(ss)) +} + // GetClusterContainerInstances will return all the container instances from a cluster func (e *ECSClient) GetClusterContainerInstances(cluster *types.ECSCluster) ([]*types.ECSContainerInstance, error) { diff --git a/collector/collector.go b/collector/collector.go index 557adb6..b5976f2 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -87,17 +87,18 @@ var ( // Exporter collects ECS clusters metrics type Exporter struct { - sync.Mutex // Our exporter object will be locakble to protect from concurrent scrapes - client ECSGatherer // Custom ECS client to get information from the clusters - region string // The region where the exporter will scrape - clusterFilter *regexp.Regexp // Compiled regular expresion to filter clusters - noCIMetrics bool // Don't gather container instance metrics - timeout time.Duration // The timeout for the whole gathering process + sync.Mutex // Our exporter object will be locakble to protect from concurrent scrapes + client ECSGatherer // Custom ECS client to get information from the clusters + region string // The region where the exporter will scrape + clusterFilter *regexp.Regexp // Compiled regular expresion to filter clusters + noCIMetrics bool // Don't gather container instance metrics + timeout time.Duration // The timeout for the whole gathering process + maxConcurrency int // The Maximum concurrency allowed for AWS operations } // New returns an initialized exporter -func New(awsRegion string, clusterFilterRegexp string, disableCIMetrics bool) (*Exporter, error) { - c, err := NewECSClient(awsRegion) +func New(awsRegion string, clusterFilterRegexp string, maxConcurrency int, disableCIMetrics bool) (*Exporter, error) { + c, err := NewECSClient(awsRegion, maxConcurrency) if err != nil { return nil, err } @@ -177,7 +178,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { e.collectClusterMetrics(ctx, ch, cs) // Start getting metrics per cluster on its own goroutine - errC := make(chan bool) + errC := make(chan bool, e.maxConcurrency) totalCs := 0 // total cluster metrics gorotine ran for _, c := range cs { diff --git a/collector/collector_test.go b/collector/collector_test.go index 062c9e7..34a55fd 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -35,7 +35,7 @@ func readGauge(g prometheus.Metric) metricResult { func TestCollectClusterMetrics(t *testing.T) { region := "eu-west-1" - exp, err := New(region, "", false) + exp, err := New(region, "", 1, false) if err != nil { t.Errorf("Creation of exporter shoudnt error: %v", err) } @@ -74,7 +74,7 @@ func TestCollectClusterMetrics(t *testing.T) { func TestCollectClusterServiceMetrics(t *testing.T) { region := "eu-west-1" - exp, err := New(region, "", false) + exp, err := New(region, "", 1, false) if err != nil { t.Errorf("Creation of exporter shouldnt error: %v", err) } @@ -149,7 +149,7 @@ func TestCollectClusterServiceMetrics(t *testing.T) { func TestCollectClusterContainerInstanceMetrics(t *testing.T) { region := "eu-west-1" - exp, err := New(region, "", false) + exp, err := New(region, "", 1, false) if err != nil { t.Errorf("Creation of exporter shouldnt error: %v", err) } @@ -280,7 +280,7 @@ func TestValidClusters(t *testing.T) { } for _, test := range tests { - e, err := New("eu-west-1", test.filter, false) + e, err := New("eu-west-1", test.filter, 1, false) if err != nil { t.Errorf("Creation of exporter shoudn't error: %v", err) } @@ -309,7 +309,7 @@ func TestCollectClusterMetricsTimeout(t *testing.T) { } }() - exp, _ := New("eu-west-1", "", false) + exp, _ := New("eu-west-1", "", 1, false) ch := make(chan prometheus.Metric) close(ch) @@ -328,7 +328,7 @@ func TestCollectClusterServiceMetricsTimeout(t *testing.T) { } }() - exp, _ := New("eu-west-1", "", false) + exp, _ := New("eu-west-1", "", 1, false) ch := make(chan prometheus.Metric) close(ch) @@ -348,7 +348,7 @@ func TestCollectContainerInstanceMetricsTimeout(t *testing.T) { } }() - exp, _ := New("eu-west-1", "", false) + exp, _ := New("eu-west-1", "", 1, false) ch := make(chan prometheus.Metric) close(ch) From 2e1a539a3feadfe53207e771a9532a55904f4304 Mon Sep 17 00:00:00 2001 From: KensoDev Date: Thu, 23 Aug 2018 10:16:47 -0700 Subject: [PATCH 2/2] Fixing tests --- collector/collector_i_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/collector/collector_i_test.go b/collector/collector_i_test.go index 7c09c64..b33aec6 100644 --- a/collector/collector_i_test.go +++ b/collector/collector_i_test.go @@ -108,7 +108,7 @@ func TestCollectError(t *testing.T) { }, } - exp, err := New("eu-west-1", "", false) + exp, err := New("eu-west-1", "", 1, false) if err != nil { t.Errorf("Creation of exporter shouldn't error: %v", err) } @@ -604,7 +604,7 @@ func TestCollectOk(t *testing.T) { cid: test.cCInstances, } - exp, err := New("eu-west-1", test.cFilter, test.disableCIM) + exp, err := New("eu-west-1", test.cFilter, 1, test.disableCIM) if err != nil { t.Errorf("Creation of exporter shouldn't error: %v", err) } @@ -661,7 +661,7 @@ func TestCollectTimeoutNoPanic(t *testing.T) { sleepFor: 10 * time.Millisecond, } - exp, err := New("eu-west-1", ".*", false) + exp, err := New("eu-west-1", ".*", 1, false) if err != nil { t.Errorf("Creation of exporter shouldn't error: %v", err) }