Skip to content

Commit

Permalink
Caplin: fix goddamn forward sync (#13831)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Feb 16, 2025
1 parent be6dc84 commit 10e2e49
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
15 changes: 9 additions & 6 deletions cl/phase1/network/beacon_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type peerAndBlocks struct {
}

func (f *ForwardBeaconDownloader) RequestMore(ctx context.Context) {
count := uint64(32)
count := uint64(16)
var atomicResp atomic.Value
atomicResp.Store(peerAndBlocks{})
reqInterval := time.NewTicker(300 * time.Millisecond)
Expand All @@ -96,11 +96,14 @@ Loop:
}
// double the request count every 10 seconds. This is inspired by the mekong network, which has many consecutive missing blocks.
reqCount := count
if !f.highestSlotUpdateTime.IsZero() {
multiplier := int(time.Since(f.highestSlotUpdateTime).Seconds()) / 10
multiplier = min(multiplier, 6)
reqCount *= uint64(1 << uint(multiplier))
}
// NEED TO COMMENT THIS BC IT CAUSES ISSUES ON MAINNET

// if !f.highestSlotUpdateTime.IsZero() {
// multiplier := int(time.Since(f.highestSlotUpdateTime).Seconds()) / 10
// multiplier = min(multiplier, 6)
// reqCount *= uint64(1 << uint(multiplier))
// }

// leave a warning if we are stuck for more than 90 seconds
if time.Since(f.highestSlotUpdateTime) > 90*time.Second {
log.Trace("Forward beacon downloader gets stuck", "time", time.Since(f.highestSlotUpdateTime).Seconds(), "highestSlotProcessed", f.highestSlotProcessed)
Expand Down
51 changes: 41 additions & 10 deletions cl/phase1/stages/forward_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ func shouldProcessBlobs(blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) bool {
}
// Check if the requested blocks are too old to request blobs
// https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#the-reqresp-domain
highestEpoch := highestSlot / cfg.beaconCfg.SlotsPerEpoch
currentEpoch := cfg.ethClock.GetCurrentEpoch()
minEpochDist := uint64(0)
if currentEpoch > cfg.beaconCfg.MinEpochsForBlobSidecarsRequests {
minEpochDist = currentEpoch - cfg.beaconCfg.MinEpochsForBlobSidecarsRequests
}
finalizedEpoch := currentEpoch - 2
if highestEpoch < max(cfg.beaconCfg.DenebForkEpoch, minEpochDist, finalizedEpoch) {
return false
}

// this is bad
// highestEpoch := highestSlot / cfg.beaconCfg.SlotsPerEpoch
// currentEpoch := cfg.ethClock.GetCurrentEpoch()
// minEpochDist := uint64(0)
// if currentEpoch > cfg.beaconCfg.MinEpochsForBlobSidecarsRequests {
// minEpochDist = currentEpoch - cfg.beaconCfg.MinEpochsForBlobSidecarsRequests
// }
// finalizedEpoch := currentEpoch - 2
// if highestEpoch < max(cfg.beaconCfg.DenebForkEpoch, minEpochDist, finalizedEpoch) {
// return false
// }

return blobsExist
}
Expand All @@ -67,6 +69,7 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf
err = fmt.Errorf("failed to get blob identifiers: %w", err)
return
}

// If there are no blobs to retrieve, return the highest slot processed
if ids.Len() == 0 {
return highestSlotProcessed, nil
Expand Down Expand Up @@ -98,6 +101,30 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf
return highestProcessed - 1, err
}

func filterUnneededBlocks(ctx context.Context, blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) []*cltypes.SignedBeaconBlock {
filtered := make([]*cltypes.SignedBeaconBlock, 0, len(blocks))
// Find the latest block in the list
for _, block := range blocks {
blockRoot, err := block.Block.HashSSZ()
if err != nil {
panic(err)
}
_, hasInFcu := cfg.forkChoice.GetHeader(blockRoot)

var hasSignedHeaderInDB bool
if err = cfg.indiciesDB.View(ctx, func(tx kv.Tx) error {
_, hasSignedHeaderInDB, err = beacon_indicies.ReadSignedHeaderByBlockRoot(ctx, tx, blockRoot)
return err
}); err != nil {
panic(err)
}
if !hasInFcu || !hasSignedHeaderInDB {
filtered = append(filtered, block)
}
}
return filtered
}

// processDownloadedBlockBatches processes a batch of downloaded blocks.
// It takes the highest block processed, a flag to determine if insertion is needed, and a list of signed beacon blocks as input.
// It returns the new highest block processed and an error if any.
Expand All @@ -107,6 +134,9 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg *
return blocks[i].Block.Slot < blocks[j].Block.Slot
})

// Filter out blocks that are already in the FCU or have a signed header in the DB
blocks = filterUnneededBlocks(ctx, blocks, cfg)

var (
blockRoot common.Hash
st *state.CachingBeaconState
Expand Down Expand Up @@ -141,6 +171,7 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg *

// Process the block
if err = processBlock(ctx, cfg, cfg.indiciesDB, block, false, true, true); err != nil {
fmt.Println("EIP-4844 data not available", err, block.Block.Slot)
if errors.Is(err, forkchoice.ErrEIP4844DataNotAvailable) {
// Return an error if EIP-4844 data is not available
logger.Trace("[Caplin] forward sync EIP-4844 data not available", "blockSlot", block.Block.Slot)
Expand Down

0 comments on commit 10e2e49

Please sign in to comment.