Skip to content

Commit

Permalink
Fixed polygon sync when no heimdall waypoints are available (#13844)
Browse files Browse the repository at this point in the history
  • Loading branch information
eastorski authored Feb 17, 2025
1 parent 1c7a902 commit efdd02a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 53 deletions.
15 changes: 3 additions & 12 deletions polygon/heimdall/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package heimdall

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -149,18 +148,10 @@ func (s *Scraper[TEntity]) RegisterObserver(observer func([]TEntity)) event.Unre
return s.observers.Register(observer)
}

func (s *Scraper[TEntity]) Synchronize(ctx context.Context) (TEntity, error) {
func (s *Scraper[TEntity]) Synchronize(ctx context.Context) (TEntity, bool, error) {
if err := s.syncEvent.Wait(ctx); err != nil {
return generics.Zero[TEntity](), err
return generics.Zero[TEntity](), false, err
}

last, ok, err := s.store.LastEntity(ctx)
if err != nil {
return generics.Zero[TEntity](), err
}
if !ok {
return generics.Zero[TEntity](), errors.New("unexpected last entity not available")
}

return last, nil
return s.store.LastEntity(ctx)
}
10 changes: 7 additions & 3 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ func (s *Service) Span(ctx context.Context, id uint64) (*Span, bool, error) {
return s.reader.Span(ctx, id)
}

func (s *Service) SynchronizeCheckpoints(ctx context.Context) (*Checkpoint, error) {
func (s *Service) SynchronizeCheckpoints(ctx context.Context) (*Checkpoint, bool, error) {
s.logger.Info(heimdallLogPrefix("synchronizing checkpoints..."))
return s.checkpointScraper.Synchronize(ctx)
}

func (s *Service) SynchronizeMilestones(ctx context.Context) (*Milestone, error) {
func (s *Service) SynchronizeMilestones(ctx context.Context) (*Milestone, bool, error) {
s.logger.Info(heimdallLogPrefix("synchronizing milestones..."))
return s.milestoneScraper.Synchronize(ctx)
}
Expand Down Expand Up @@ -205,9 +205,13 @@ func (s *Service) SynchronizeSpans(ctx context.Context, blockNum uint64) error {
}

func (s *Service) synchronizeSpans(ctx context.Context) error {
if _, err := s.spanScraper.Synchronize(ctx); err != nil {
_, ok, err := s.spanScraper.Synchronize(ctx)
if err != nil {
return err
}
if !ok {
return errors.New("unexpected last entity not available")
}

if err := s.spanBlockProducersTracker.Synchronize(ctx); err != nil {
return err
Expand Down
8 changes: 6 additions & 2 deletions polygon/heimdall/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ func (suite *ServiceTestSuite) SetupSuite() {
return suite.service.Run(suite.ctx)
})

lastMilestone, err := suite.service.SynchronizeMilestones(suite.ctx)
lastMilestone, ok, err := suite.service.SynchronizeMilestones(suite.ctx)
require.NoError(suite.T(), err)
require.True(suite.T(), ok)
require.Equal(suite.T(), suite.expectedLastMilestone, uint64(lastMilestone.Id))
lastCheckpoint, err := suite.service.SynchronizeCheckpoints(suite.ctx)

lastCheckpoint, ok, err := suite.service.SynchronizeCheckpoints(suite.ctx)
require.NoError(suite.T(), err)
require.True(suite.T(), ok)
require.Equal(suite.T(), suite.expectedLastCheckpoint, uint64(lastCheckpoint.Id))

err = suite.service.SynchronizeSpans(suite.ctx, math.MaxInt)
require.NoError(suite.T(), err)
}
Expand Down
105 changes: 69 additions & 36 deletions polygon/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ import (
"github.com/erigontech/erigon/turbo/shards"
)

// If there are no waypoints from Heimdall (including in our local database), we won't be able to rely on the last received waypoint
// to determine the root of the canonical chain builder tree. However, we heuristically know the maximum expected height of such a tree.
// Therefore, given a descendant (tip), we can select a block that lags behind it by this constant value and consider it as the root.
// If we happen to choose an older root, it's not a problem since this only temporarily affects the tree size.
//
// Waypoints may be absent in case if it's an early stage of the chain's evolution, starting from the genesis block.
// The current constant value is chosen based on observed metrics in production as twice the doubled value of the maximum observed waypoint length.
const maxFinalizationHeight = 512

type heimdallSynchronizer interface {
IsCatchingUp(ctx context.Context) (bool, error)
SynchronizeCheckpoints(ctx context.Context) (latest *heimdall.Checkpoint, err error)
SynchronizeMilestones(ctx context.Context) (latest *heimdall.Milestone, err error)
SynchronizeCheckpoints(ctx context.Context) (latest *heimdall.Checkpoint, ok bool, err error)
SynchronizeMilestones(ctx context.Context) (latest *heimdall.Milestone, ok bool, err error)
SynchronizeSpans(ctx context.Context, blockNum uint64) error
Ready(ctx context.Context) <-chan error
}
Expand Down Expand Up @@ -760,10 +769,19 @@ func (s *Sync) Run(ctx context.Context) error {
// canonical chain tip.
func (s *Sync) initialiseCcb(ctx context.Context, result syncToTipResult) (*CanonicalChainBuilder, error) {
tip := result.latestTip

tipNum := tip.Number.Uint64()
rootNum := result.latestWaypoint.EndBlock().Uint64()
if rootNum > tipNum {
return nil, fmt.Errorf("unexpected rootNum > tipNum: %d > %d", rootNum, tipNum)
rootNum := uint64(0)

if tipNum > maxFinalizationHeight {
rootNum = tipNum - maxFinalizationHeight
}

if result.latestWaypoint != nil {
rootNum = result.latestWaypoint.EndBlock().Uint64()
if result.latestWaypoint.EndBlock().Uint64() > tipNum {
return nil, fmt.Errorf("unexpected rootNum > tipNum: %d > %d", rootNum, tipNum)
}
}

s.logger.Debug(syncLogPrefix("initialising canonical chain builder"), "rootNum", rootNum, "tipNum", tipNum)
Expand Down Expand Up @@ -801,69 +819,81 @@ type syncToTipResult struct {
}

func (s *Sync) syncToTip(ctx context.Context) (syncToTipResult, error) {
startTime := time.Now()
latestTipOnStart, err := s.execution.CurrentHeader(ctx)
if err != nil {
return syncToTipResult{}, err
}

result, err := s.syncToTipUsingCheckpoints(ctx, latestTipOnStart)
finalisedTip := syncToTipResult{
latestTip: latestTipOnStart,
}

startTime := time.Now()
result, ok, err := s.syncToTipUsingCheckpoints(ctx, finalisedTip.latestTip)
if err != nil {
return syncToTipResult{}, err
}

blocks := result.latestTip.Number.Uint64() - latestTipOnStart.Number.Uint64()
s.logger.Info(
syncLogPrefix("checkpoint sync finished"),
"tip", result.latestTip.Number.Uint64(),
"time", common.PrettyAge(startTime),
"blocks", blocks,
"blk/sec", uint64(float64(blocks)/time.Since(startTime).Seconds()),
)
if ok {
blocks := result.latestTip.Number.Uint64() - finalisedTip.latestTip.Number.Uint64()
s.logger.Info(
syncLogPrefix("checkpoint sync finished"),
"tip", result.latestTip.Number.Uint64(),
"time", common.PrettyAge(startTime),
"blocks", blocks,
"blk/sec", uint64(float64(blocks)/time.Since(startTime).Seconds()),
)

finalisedTip = result
}

startTime = time.Now()
result, err = s.syncToTipUsingMilestones(ctx, result.latestTip)
result, ok, err = s.syncToTipUsingMilestones(ctx, finalisedTip.latestTip)
if err != nil {
return syncToTipResult{}, err
}

blocks = result.latestTip.Number.Uint64() - latestTipOnStart.Number.Uint64()
s.logger.Info(
syncLogPrefix("sync to tip finished"),
"tip", result.latestTip.Number.Uint64(),
"time", common.PrettyAge(startTime),
"blocks", blocks,
"blk/sec", uint64(float64(blocks)/time.Since(startTime).Seconds()),
)
if ok {
blocks := result.latestTip.Number.Uint64() - finalisedTip.latestTip.Number.Uint64()
s.logger.Info(
syncLogPrefix("milestone sync finished"),
"tip", result.latestTip.Number.Uint64(),
"time", common.PrettyAge(startTime),
"blocks", blocks,
"blk/sec", uint64(float64(blocks)/time.Since(startTime).Seconds()),
)
finalisedTip = result
}

return result, nil
return finalisedTip, nil
}

func (s *Sync) syncToTipUsingCheckpoints(ctx context.Context, tip *types.Header) (syncToTipResult, error) {
syncCheckpoints := func(ctx context.Context) (heimdall.Waypoint, error) {
func (s *Sync) syncToTipUsingCheckpoints(ctx context.Context, tip *types.Header) (syncToTipResult, bool, error) {
syncCheckpoints := func(ctx context.Context) (heimdall.Waypoint, bool, error) {
return s.heimdallSync.SynchronizeCheckpoints(ctx)
}
return s.sync(ctx, tip, syncCheckpoints, s.blockDownloader.DownloadBlocksUsingCheckpoints)
}

func (s *Sync) syncToTipUsingMilestones(ctx context.Context, tip *types.Header) (syncToTipResult, error) {
syncMilestones := func(ctx context.Context) (heimdall.Waypoint, error) {
func (s *Sync) syncToTipUsingMilestones(ctx context.Context, tip *types.Header) (syncToTipResult, bool, error) {
syncMilestones := func(ctx context.Context) (heimdall.Waypoint, bool, error) {
return s.heimdallSync.SynchronizeMilestones(ctx)
}
return s.sync(ctx, tip, syncMilestones, s.blockDownloader.DownloadBlocksUsingMilestones)
}

type waypointSyncFunc func(ctx context.Context) (heimdall.Waypoint, error)
type waypointSyncFunc func(ctx context.Context) (heimdall.Waypoint, bool, error)
type blockDownloadFunc func(ctx context.Context, startBlockNum uint64, endBlockNum *uint64) (*types.Header, error)

func (s *Sync) sync(
ctx context.Context,
tip *types.Header,
waypointSync waypointSyncFunc,
blockDownload blockDownloadFunc,
) (syncToTipResult, error) {
) (syncToTipResult, bool, error) {
var waypoint heimdall.Waypoint
var err error
var ok bool

var syncTo *uint64

Expand All @@ -872,17 +902,20 @@ func (s *Sync) sync(
}

for {
waypoint, err = waypointSync(ctx)
waypoint, ok, err = waypointSync(ctx)
if err != nil {
return syncToTipResult{}, err
return syncToTipResult{}, false, err
}
if !ok {
return syncToTipResult{}, false, nil
}

// notify about latest waypoint end block so that eth_syncing API doesn't flicker on initial sync
s.notifications.NewLastBlockSeen(waypoint.EndBlock().Uint64())

newTip, err := blockDownload(ctx, tip.Number.Uint64()+1, syncTo)
if err != nil {
return syncToTipResult{}, err
return syncToTipResult{}, false, err
}

if newTip == nil {
Expand All @@ -894,7 +927,7 @@ func (s *Sync) sync(
// note: if we face a failure during execution of finalized waypoints blocks, it means that
// we're wrong and the blocks are not considered as bad blocks, so we should terminate
err = s.handleWaypointExecutionErr(ctx, tip, err)
return syncToTipResult{}, err
return syncToTipResult{}, false, err
}

tip = newTip
Expand All @@ -906,7 +939,7 @@ func (s *Sync) sync(
}
}

return syncToTipResult{latestTip: tip, latestWaypoint: waypoint}, nil
return syncToTipResult{latestTip: tip, latestWaypoint: waypoint}, true, nil
}

func (s *Sync) handleWaypointExecutionErr(ctx context.Context, lastCorrectTip *types.Header, execErr error) error {
Expand Down

0 comments on commit efdd02a

Please sign in to comment.