Skip to content

Commit

Permalink
Merge pull request #10 from INFURA/improve_rate_limit
Browse files Browse the repository at this point in the history
Improve rate limiting to be more accurate and easier to control.
  • Loading branch information
EnchanterIO authored Dec 2, 2021
2 parents 9eb85d7 + 3019145 commit a01c5aa
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 74 deletions.
8 changes: 4 additions & 4 deletions cmd/ipfs-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

const DefaultApiUrl = "https://ipfs.infura.io:5001"
const DefaultWorkersCount = 1
const DefaultMaxReqsPerSec = 10
const Version = "1.2.0"
const DefaultWorkersCount = 20
const DefaultMaxReqsPerSec = 50
const Version = "1.3.0"

type Config struct {
ApiUrl string `env:"IC_API_URL" cli:"api-url"`
Expand Down Expand Up @@ -125,7 +125,7 @@ func PumpBlocksAndCopyPins(ctx context.Context, cfg Config, infuraShell *ipfsApi

pinEnum := ipfsPump.NewAPIPinEnumerator(cfg.SourceAPI, isEnumStreamingPossible)
blocksColl := ipfsPump.NewAPICollector(cfg.SourceAPI)
drain := pump.NewRateLimitedDrain(ipfsPump.NewAPIDrainWithShell(infuraShell), ipfsCopy.CalculateRateLimitDuration(cfg.MaxReqsPerSec))
drain := pump.NewRateLimitedDrain(ipfsPump.NewAPIDrainWithShell(infuraShell))

// Copy all the blocks
ipfsPump.PumpIt(pinEnum, blocksColl, drain, failedCIDsWriter, progressWriter, uint(cfg.Workers))
Expand Down
17 changes: 9 additions & 8 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"bufio"
"context"
"fmt"
ipfsPump "github.com/INFURA/ipfs-pump/pump"
ipfsCid "github.com/ipfs/go-cid"
ipfsShell "github.com/ipfs/go-ipfs-api"
rl "go.uber.org/ratelimit"
"io"
"log"
"strings"
"sync"
"sync/atomic"
"time"

ipfsPump "github.com/INFURA/ipfs-pump/pump"
ipfsCid "github.com/ipfs/go-cid"
ipfsShell "github.com/ipfs/go-ipfs-api"
)

// PinCIDsFromFile will open the file, read a CID from each line separated by LB char and pin them
Expand Down Expand Up @@ -128,20 +127,22 @@ func pinCIDs(cids <-chan ipfsCid.Cid, workers int, maxReqsPerSec int, infuraShel
successPinsCount = 0
failedPinsCount = 0

// 5 workers (by default) will be handling pinning of the entire file
rlm := rl.New(maxReqsPerSec)

var wg sync.WaitGroup
for w := 1; w <= workers; w++ {
wg.Add(1)
go func() {
for cid := range cids {
// Avoid getting rate limited
rlm.Take()

ok := pinCID(cid, infuraShell, failedPinsWriter)
if ok {
atomic.AddUint64(&successPinsCount, 1)
} else {
atomic.AddUint64(&failedPinsCount, 1)
}
// Avoid getting rate limited
time.Sleep(CalculateRateLimitDuration(maxReqsPerSec))
}
wg.Done()
}()
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ require (
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
github.com/coreos/go-semver v0.3.0
github.com/gravitational/configure v0.0.0-20180808141939-c3428bd84c23
github.com/gravitational/log v0.0.0-20200127200505-fdffa14162b0
github.com/gravitational/log v0.0.0-20200127200505-fdffa14162b0 // indirect
github.com/gravitational/trace v1.1.14 // indirect
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-ipfs-api v0.2.0
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.6.1
go.uber.org/ratelimit v0.2.0
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 h1:AUNCr9CiJuwrRYS3XieqF+Z9B9gNxo/eANAJCF2eiN4=
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/aws/aws-sdk-go v1.15.60 h1:ZSPehAuk0wxKqLMN1AIAMcVQWlLW2wtfJD/nPgxJZuE=
github.com/aws/aws-sdk-go v1.15.60/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -378,6 +380,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/texttheater/golang-levenshtein v0.0.0-20180516184445-d188e65d659e/go.mod h1:XDKHRm5ThF8YJjx001LtgelzsoaEcvnA7lVWz9EeX3g=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand All @@ -404,6 +408,10 @@ github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:
github.com/whyrusleeping/yamux v1.1.5/go.mod h1:E8LnQQ8HKx5KD29HZFUwM1PxCOdPRzGwur1mcYhXcD8=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
go4.org v0.0.0-20190218023631-ce4c26f7be8e/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180426230345-b49d69b5da94/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down Expand Up @@ -488,6 +496,8 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.1.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/gotestsum v0.3.3/go.mod h1:0qrQpYrdmNTIx/xcxDS62tIo5eLVdVd2ALLCk1ST0BU=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
12 changes: 3 additions & 9 deletions pump/drain.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package pump

import (
"time"

ipfsPump "github.com/INFURA/ipfs-pump/pump"
)

type RateLimitedDrain struct {
drain ipfsPump.CountedDrain
sleepAfterDrain time.Duration
drain ipfsPump.CountedDrain
}

var _ ipfsPump.CountedDrain = (*ipfsPump.CounterDrain)(nil)

func NewRateLimitedDrain(drain ipfsPump.Drain, sleepAfterDrain time.Duration) ipfsPump.CountedDrain {
func NewRateLimitedDrain(drain ipfsPump.Drain) ipfsPump.CountedDrain {
countedDrain := ipfsPump.NewCountedDrain(drain)

return &RateLimitedDrain{drain: countedDrain, sleepAfterDrain: sleepAfterDrain}
return &RateLimitedDrain{drain: countedDrain}
}

func (d *RateLimitedDrain) Drain(block ipfsPump.Block) error {
Expand All @@ -25,9 +22,6 @@ func (d *RateLimitedDrain) Drain(block ipfsPump.Block) error {
return err
}

// Avoid getting rate limited
time.Sleep(d.sleepAfterDrain)

return nil
}

Expand Down
9 changes: 0 additions & 9 deletions rate_limit.go

This file was deleted.

42 changes: 0 additions & 42 deletions rate_limit_test.go

This file was deleted.

0 comments on commit a01c5aa

Please sign in to comment.