From e030b7429b764a56707f9755d6ac3d5a585faad0 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Fri, 21 Feb 2025 09:50:47 +0100 Subject: [PATCH] Cherry-pick: fix stuck forward sync (#13892) Cherry pick PR #13883 into `release/3.0` --- cl/phase1/stages/forward_sync.go | 44 +++++--------------------------- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/cl/phase1/stages/forward_sync.go b/cl/phase1/stages/forward_sync.go index 7bde4554c59..68ac8cba87e 100644 --- a/cl/phase1/stages/forward_sync.go +++ b/cl/phase1/stages/forward_sync.go @@ -101,30 +101,6 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf return highestProcessed - 1, err } -func filterUnneededBlocks(ctx context.Context, blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) []*cltypes.SignedBeaconBlock { - filtered := make([]*cltypes.SignedBeaconBlock, 0, len(blocks)) - // Find the latest block in the list - for _, block := range blocks { - blockRoot, err := block.Block.HashSSZ() - if err != nil { - panic(err) - } - _, hasInFcu := cfg.forkChoice.GetHeader(blockRoot) - - var hasSignedHeaderInDB bool - if err = cfg.indiciesDB.View(ctx, func(tx kv.Tx) error { - _, hasSignedHeaderInDB, err = beacon_indicies.ReadSignedHeaderByBlockRoot(ctx, tx, blockRoot) - return err - }); err != nil { - panic(err) - } - if !hasInFcu || !hasSignedHeaderInDB { - filtered = append(filtered, block) - } - } - return filtered -} - // processDownloadedBlockBatches processes a batch of downloaded blocks. // It takes the highest block processed, a flag to determine if insertion is needed, and a list of signed beacon blocks as input. // It returns the new highest block processed and an error if any. @@ -134,12 +110,6 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg * return blocks[i].Block.Slot < blocks[j].Block.Slot }) - // Filter out blocks that are already in the FCU or have a signed header in the DB - blocks = filterUnneededBlocks(ctx, blocks, cfg) - if len(blocks) == 0 { - return highestBlockProcessed, nil - } - var ( blockRoot common.Hash st *state.CachingBeaconState @@ -228,16 +198,16 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg * // forwardSync (MAIN ROUTINE FOR ForwardSync) performs the forward synchronization of beacon blocks. func forwardSync(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { var ( - shouldInsert = cfg.executionClient != nil && cfg.executionClient.SupportInsertion() // Check if the execution client supports insertion - finalizedCheckpoint = cfg.forkChoice.FinalizedCheckpoint() // Get the finalized checkpoint from fork choice - secsPerLog = 30 // Interval in seconds for logging progress - logTicker = time.NewTicker(time.Duration(secsPerLog) * time.Second) // Ticker for logging progress - downloader = network2.NewForwardBeaconDownloader(ctx, cfg.rpc) // Initialize a new forward beacon downloader - currentSlot atomic.Uint64 // Atomic variable to track the current slot + shouldInsert = cfg.executionClient != nil && cfg.executionClient.SupportInsertion() // Check if the execution client supports insertion + startSlot = cfg.forkChoice.HighestSeen() - 8 // Start forwardsync a little bit behind the highest seen slot (account for potential reorgs) + secsPerLog = 30 // Interval in seconds for logging progress + logTicker = time.NewTicker(time.Duration(secsPerLog) * time.Second) // Ticker for logging progress + downloader = network2.NewForwardBeaconDownloader(ctx, cfg.rpc) // Initialize a new forward beacon downloader + currentSlot atomic.Uint64 // Atomic variable to track the current slot ) // Initialize the slot to download from the finalized checkpoint - currentSlot.Store(finalizedCheckpoint.Epoch * cfg.beaconCfg.SlotsPerEpoch) + currentSlot.Store(startSlot) // Always start from the current finalized checkpoint downloader.SetHighestProcessedSlot(currentSlot.Load())