diff --git a/deploy/base/secret.yaml b/deploy/base/secret.yaml index af5ac09d..5b7d581b 100644 --- a/deploy/base/secret.yaml +++ b/deploy/base/secret.yaml @@ -14,3 +14,4 @@ stringData: rss3_chain: endpoint_l1: endpoint_l2: + block_threads_l1: 20 diff --git a/deploy/config.yaml b/deploy/config.yaml index 3b2be62c..319f9884 100644 --- a/deploy/config.yaml +++ b/deploy/config.yaml @@ -8,3 +8,4 @@ database: rss3_chain: endpoint_l1: https://rpc.ankr.com/eth_sepolia endpoint_l2: https://rpc.testnet.rss3.io + block_threads_l1: 20 diff --git a/internal/config/config.go b/internal/config/config.go index 4f43aeee..679b7638 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -23,8 +23,9 @@ type File struct { } type RSS3ChainConfig struct { - EndpointL1 string `yaml:"endpoint_l1" validate:"required"` - EndpointL2 string `yaml:"endpoint_l2" validate:"required"` + EndpointL1 string `yaml:"endpoint_l1" validate:"required"` + EndpointL2 string `yaml:"endpoint_l2" validate:"required"` + BlockThreadsL1 uint64 `yaml:"block_threads_l1" default:"1"` } func Setup(configFilePath string) (*File, error) { diff --git a/internal/service/indexer/l1/config.go b/internal/service/indexer/l1/config.go index 05215a18..76c383cc 100644 --- a/internal/service/indexer/l1/config.go +++ b/internal/service/indexer/l1/config.go @@ -1,5 +1,6 @@ package l1 type Config struct { - Endpoint string `yaml:"endpoint"` + Endpoint string `yaml:"endpoint"` + BlockThreads uint64 `yaml:"block_threads"` } diff --git a/internal/service/indexer/l1/indexer.go b/internal/service/indexer/l1/indexer.go index 1ca7ac54..f314c687 100644 --- a/internal/service/indexer/l1/indexer.go +++ b/internal/service/indexer/l1/indexer.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math/big" + "sort" + "sync" "time" "github.com/ethereum-optimism/optimism/op-bindings/bindings" @@ -15,6 +17,7 @@ import ( "github.com/naturalselectionlabs/rss3-global-indexer/internal/service" "github.com/naturalselectionlabs/rss3-global-indexer/schema" "github.com/samber/lo" + "github.com/sourcegraph/conc/pool" "go.uber.org/zap" ) @@ -30,6 +33,7 @@ type server struct { contractL1StandardBridge *bindings.L1StandardBridge checkpoint *schema.Checkpoint blockNumberLatest uint64 + blockThreads uint64 } func (s *server) Run(ctx context.Context) (err error) { @@ -70,22 +74,75 @@ func (s *server) run(ctx context.Context) (err error) { continue } - blockNumberCurrent := s.checkpoint.BlockNumber + 1 + // Get blocks from RPC. + blockResultPool := pool.NewWithResults[*types.Block](). + WithContext(ctx). + WithCancelOnError(). + WithFirstError() - // Get current block (header and transactions). - block, err := s.ethereumClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumberCurrent)) - if err != nil { - return fmt.Errorf("get block: %w", err) + for offset := uint64(1); offset <= s.blockThreads; offset++ { + blockNumber := s.checkpoint.BlockNumber + offset + + if blockNumber > s.blockNumberLatest { + continue + } + + blockResultPool.Go(func(ctx context.Context) (*types.Block, error) { + // Get current block (header and transactions). + block, err := s.ethereumClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) + if err != nil { + return nil, fmt.Errorf("get block: %w", err) + } + + return block, nil + }) } - // Get all receipts of the current block. - receipts, err := s.ethereumClient.BlockReceipts(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockNumberCurrent))) + blocks, err := blockResultPool.Wait() if err != nil { - return fmt.Errorf("get receipts: %w", err) + return fmt.Errorf("wait block result pool: %w", err) + } + + // Get receipts from RPC. + var ( + receiptsMap = make(map[uint64][]*types.Receipt) + receiptsMapLocker sync.Mutex + ) + + receiptsPool := pool.New(). + WithContext(ctx). + WithCancelOnError(). + WithFirstError() + + for _, block := range blocks { + block := block + + receiptsPool.Go(func(ctx context.Context) error { + receipts, err := s.ethereumClient.BlockReceipts(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(block.NumberU64()))) + if err != nil { + return fmt.Errorf("get block receipts: %w", err) + } + + receiptsMapLocker.Lock() + defer receiptsMapLocker.Unlock() + receiptsMap[block.NumberU64()] = receipts + + return nil + }) + } + + if err := receiptsPool.Wait(); err != nil { + return fmt.Errorf("wait receipts pool: %w", err) } - if err := s.index(ctx, block, receipts); err != nil { - return fmt.Errorf("index block #%d: %w", blockNumberCurrent, err) + sort.SliceStable(blocks, func(i, j int) bool { + return blocks[i].Number().Cmp(blocks[j].Number()) < 0 + }) + + for _, block := range blocks { + if err := s.index(ctx, block, receiptsMap[block.NumberU64()]); err != nil { + return fmt.Errorf("index block #%d: %w", block.NumberU64(), err) + } } } } @@ -151,6 +208,7 @@ func NewServer(ctx context.Context, databaseClient database.Client, config Confi var ( instance = server{ databaseClient: databaseClient, + blockThreads: config.BlockThreads, } err error ) diff --git a/internal/service/indexer/server.go b/internal/service/indexer/server.go index 2740022e..3a58213e 100644 --- a/internal/service/indexer/server.go +++ b/internal/service/indexer/server.go @@ -21,7 +21,8 @@ func (s *Server) Run(ctx context.Context) error { // Run L1 indexer. errorGroup.Go(func(ctx context.Context) error { l1Config := l1.Config{ - Endpoint: s.config.EndpointL1, + Endpoint: s.config.EndpointL1, + BlockThreads: s.config.BlockThreadsL1, } serverL1, err := l1.NewServer(ctx, s.databaseClient, l1Config)