diff --git a/cl/phase1/network/beacon_downloader.go b/cl/phase1/network/beacon_downloader.go index 67626707b8a..64a537eaeea 100644 --- a/cl/phase1/network/beacon_downloader.go +++ b/cl/phase1/network/beacon_downloader.go @@ -76,7 +76,7 @@ type peerAndBlocks struct { } func (f *ForwardBeaconDownloader) RequestMore(ctx context.Context) { - count := uint64(32) + count := uint64(16) var atomicResp atomic.Value atomicResp.Store(peerAndBlocks{}) reqInterval := time.NewTicker(300 * time.Millisecond) @@ -96,11 +96,14 @@ Loop: } // double the request count every 10 seconds. This is inspired by the mekong network, which has many consecutive missing blocks. reqCount := count - if !f.highestSlotUpdateTime.IsZero() { - multiplier := int(time.Since(f.highestSlotUpdateTime).Seconds()) / 10 - multiplier = min(multiplier, 6) - reqCount *= uint64(1 << uint(multiplier)) - } + // NEED TO COMMENT THIS BC IT CAUSES ISSUES ON MAINNET + + // if !f.highestSlotUpdateTime.IsZero() { + // multiplier := int(time.Since(f.highestSlotUpdateTime).Seconds()) / 10 + // multiplier = min(multiplier, 6) + // reqCount *= uint64(1 << uint(multiplier)) + // } + // leave a warning if we are stuck for more than 90 seconds if time.Since(f.highestSlotUpdateTime) > 90*time.Second { log.Trace("Forward beacon downloader gets stuck", "time", time.Since(f.highestSlotUpdateTime).Seconds(), "highestSlotProcessed", f.highestSlotProcessed) diff --git a/cl/phase1/stages/forward_sync.go b/cl/phase1/stages/forward_sync.go index e7162b1df8d..bb611eaebd1 100644 --- a/cl/phase1/stages/forward_sync.go +++ b/cl/phase1/stages/forward_sync.go @@ -37,16 +37,18 @@ func shouldProcessBlobs(blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) bool { } // Check if the requested blocks are too old to request blobs // https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#the-reqresp-domain - highestEpoch := highestSlot / cfg.beaconCfg.SlotsPerEpoch - currentEpoch := cfg.ethClock.GetCurrentEpoch() - minEpochDist := uint64(0) - if currentEpoch > cfg.beaconCfg.MinEpochsForBlobSidecarsRequests { - minEpochDist = currentEpoch - cfg.beaconCfg.MinEpochsForBlobSidecarsRequests - } - finalizedEpoch := currentEpoch - 2 - if highestEpoch < max(cfg.beaconCfg.DenebForkEpoch, minEpochDist, finalizedEpoch) { - return false - } + + // this is bad + // highestEpoch := highestSlot / cfg.beaconCfg.SlotsPerEpoch + // currentEpoch := cfg.ethClock.GetCurrentEpoch() + // minEpochDist := uint64(0) + // if currentEpoch > cfg.beaconCfg.MinEpochsForBlobSidecarsRequests { + // minEpochDist = currentEpoch - cfg.beaconCfg.MinEpochsForBlobSidecarsRequests + // } + // finalizedEpoch := currentEpoch - 2 + // if highestEpoch < max(cfg.beaconCfg.DenebForkEpoch, minEpochDist, finalizedEpoch) { + // return false + // } return blobsExist } @@ -67,6 +69,7 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf err = fmt.Errorf("failed to get blob identifiers: %w", err) return } + // If there are no blobs to retrieve, return the highest slot processed if ids.Len() == 0 { return highestSlotProcessed, nil @@ -98,6 +101,30 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf return highestProcessed - 1, err } +func filterUnneededBlocks(ctx context.Context, blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) []*cltypes.SignedBeaconBlock { + filtered := make([]*cltypes.SignedBeaconBlock, 0, len(blocks)) + // Find the latest block in the list + for _, block := range blocks { + blockRoot, err := block.Block.HashSSZ() + if err != nil { + panic(err) + } + _, hasInFcu := cfg.forkChoice.GetHeader(blockRoot) + + var hasSignedHeaderInDB bool + if err = cfg.indiciesDB.View(ctx, func(tx kv.Tx) error { + _, hasSignedHeaderInDB, err = beacon_indicies.ReadSignedHeaderByBlockRoot(ctx, tx, blockRoot) + return err + }); err != nil { + panic(err) + } + if !hasInFcu || !hasSignedHeaderInDB { + filtered = append(filtered, block) + } + } + return filtered +} + // processDownloadedBlockBatches processes a batch of downloaded blocks. // It takes the highest block processed, a flag to determine if insertion is needed, and a list of signed beacon blocks as input. // It returns the new highest block processed and an error if any. @@ -107,6 +134,9 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg * return blocks[i].Block.Slot < blocks[j].Block.Slot }) + // Filter out blocks that are already in the FCU or have a signed header in the DB + blocks = filterUnneededBlocks(ctx, blocks, cfg) + var ( blockRoot common.Hash st *state.CachingBeaconState @@ -141,6 +171,7 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg * // Process the block if err = processBlock(ctx, cfg, cfg.indiciesDB, block, false, true, true); err != nil { + fmt.Println("EIP-4844 data not available", err, block.Block.Slot) if errors.Is(err, forkchoice.ErrEIP4844DataNotAvailable) { // Return an error if EIP-4844 data is not available logger.Trace("[Caplin] forward sync EIP-4844 data not available", "blockSlot", block.Block.Slot)