Skip to content

Commit

Permalink
feat(indexer): use multi-thread requests for l1 (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
kallydev committed Jan 23, 2024
1 parent 08fddfe commit a4ebf4a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 14 deletions.
1 change: 1 addition & 0 deletions deploy/base/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ stringData:
rss3_chain:
endpoint_l1: <SEPOLIA_CHAIN_ENDPOINT>
endpoint_l2: <RSS3_CHAIN_ENDPOINT>
block_threads_l1: <BLOCK_THREADS_L1>
1 change: 1 addition & 0 deletions deploy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ database:
rss3_chain:
endpoint_l1: https://rpc.ankr.com/eth_sepolia
endpoint_l2: https://rpc.testnet.rss3.io
block_threads_l1: 20
5 changes: 3 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type File struct {
}

type RSS3ChainConfig struct {
EndpointL1 string `yaml:"endpoint_l1" validate:"required"`
EndpointL2 string `yaml:"endpoint_l2" validate:"required"`
EndpointL1 string `yaml:"endpoint_l1" validate:"required"`
EndpointL2 string `yaml:"endpoint_l2" validate:"required"`
BlockThreadsL1 uint64 `yaml:"block_threads_l1" default:"1"`
}

func Setup(configFilePath string) (*File, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/service/indexer/l1/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package l1

type Config struct {
Endpoint string `yaml:"endpoint"`
Endpoint string `yaml:"endpoint"`
BlockThreads uint64 `yaml:"block_threads"`
}
78 changes: 68 additions & 10 deletions internal/service/indexer/l1/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-bindings/bindings"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/naturalselectionlabs/rss3-global-indexer/internal/service"
"github.com/naturalselectionlabs/rss3-global-indexer/schema"
"github.com/samber/lo"
"github.com/sourcegraph/conc/pool"
"go.uber.org/zap"
)

Expand All @@ -30,6 +33,7 @@ type server struct {
contractL1StandardBridge *bindings.L1StandardBridge
checkpoint *schema.Checkpoint
blockNumberLatest uint64
blockThreads uint64
}

func (s *server) Run(ctx context.Context) (err error) {
Expand Down Expand Up @@ -70,22 +74,75 @@ func (s *server) run(ctx context.Context) (err error) {
continue
}

blockNumberCurrent := s.checkpoint.BlockNumber + 1
// Get blocks from RPC.
blockResultPool := pool.NewWithResults[*types.Block]().
WithContext(ctx).
WithCancelOnError().
WithFirstError()

// Get current block (header and transactions).
block, err := s.ethereumClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumberCurrent))
if err != nil {
return fmt.Errorf("get block: %w", err)
for offset := uint64(1); offset <= s.blockThreads; offset++ {
blockNumber := s.checkpoint.BlockNumber + offset

if blockNumber > s.blockNumberLatest {
continue
}

blockResultPool.Go(func(ctx context.Context) (*types.Block, error) {
// Get current block (header and transactions).
block, err := s.ethereumClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil {
return nil, fmt.Errorf("get block: %w", err)
}

return block, nil
})
}

// Get all receipts of the current block.
receipts, err := s.ethereumClient.BlockReceipts(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockNumberCurrent)))
blocks, err := blockResultPool.Wait()
if err != nil {
return fmt.Errorf("get receipts: %w", err)
return fmt.Errorf("wait block result pool: %w", err)
}

// Get receipts from RPC.
var (
receiptsMap = make(map[uint64][]*types.Receipt)
receiptsMapLocker sync.Mutex
)

receiptsPool := pool.New().
WithContext(ctx).
WithCancelOnError().
WithFirstError()

for _, block := range blocks {
block := block

receiptsPool.Go(func(ctx context.Context) error {
receipts, err := s.ethereumClient.BlockReceipts(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(block.NumberU64())))
if err != nil {
return fmt.Errorf("get block receipts: %w", err)
}

receiptsMapLocker.Lock()
defer receiptsMapLocker.Unlock()
receiptsMap[block.NumberU64()] = receipts

return nil
})
}

if err := receiptsPool.Wait(); err != nil {
return fmt.Errorf("wait receipts pool: %w", err)
}

if err := s.index(ctx, block, receipts); err != nil {
return fmt.Errorf("index block #%d: %w", blockNumberCurrent, err)
sort.SliceStable(blocks, func(i, j int) bool {
return blocks[i].Number().Cmp(blocks[j].Number()) < 0
})

for _, block := range blocks {
if err := s.index(ctx, block, receiptsMap[block.NumberU64()]); err != nil {
return fmt.Errorf("index block #%d: %w", block.NumberU64(), err)
}
}
}
}
Expand Down Expand Up @@ -151,6 +208,7 @@ func NewServer(ctx context.Context, databaseClient database.Client, config Confi
var (
instance = server{
databaseClient: databaseClient,
blockThreads: config.BlockThreads,
}
err error
)
Expand Down
3 changes: 2 additions & 1 deletion internal/service/indexer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func (s *Server) Run(ctx context.Context) error {
// Run L1 indexer.
errorGroup.Go(func(ctx context.Context) error {
l1Config := l1.Config{
Endpoint: s.config.EndpointL1,
Endpoint: s.config.EndpointL1,
BlockThreads: s.config.BlockThreadsL1,
}

serverL1, err := l1.NewServer(ctx, s.databaseClient, l1Config)
Expand Down

0 comments on commit a4ebf4a

Please sign in to comment.