From 789e568835a53197095cd2f05389daac293f0f46 Mon Sep 17 00:00:00 2001 From: milen <94537774+taratorio@users.noreply.github.com> Date: Mon, 16 Sep 2024 09:59:26 +0100 Subject: [PATCH] astrid stage: do not unwind finalized data (#11999) relates to: https://github.com/erigontech/erigon/issues/11533 but mainly fixes an issue at tip that appeared in a run of astrid stage integration: - we fetched event 2900 as soon as it appeared - this event was meant to be assigned to block 11962480 ``` DBUG[09-14|13:57:41.878] [bridge] fetched new events periodic progress count=1 lastFetchedEventID=2900 lastFetchedEventTime=2024-09-14T12:57:35Z ``` - we then had a change of forks ``` INFO[09-14|13:58:31.961] [2/6 PolygonSync] new fork - unwinding and caching fork choice unwindNumber=11962436 badHash=0x7e377810370189ece829b48172cc464672393678142cf105aad9212736893b8d cachedTipNumber=11962437 cachedTipHash=0x5076be24d339b5d75b52659dc746d3ceaa3d8dd873f3c34e5655849787d03d2b cachedNewNodes=1 DBUG[09-14|13:58:31.961] UnwindTo block=11962436 block_hash=0x7e377810370189ece829b48172cc464672393678142cf105aad9212736893b8d err=nil stack="[sync.go:171 stage_polygon_sync.go:1437 stage_polygon_sync.go:1390 stage_polygon_sync.go:1609 stage_polygon_sync.go:497 stage_polygon_sync.go:176 default_stages.go:479 sync.go:531 sync.go:410 stageloop.go:249 stageloop.go:101 asm_arm64.s:1222]" ``` - when that happened we unwound the stages and deleted event 2900 (below is from a debug print statement which says - unwind everything > prevSprintLastID) ``` UNWIND prevSprintLastID:2899 ``` - then when the bridge bridge processed block 11962480, the event was not in the DB as it was deleted due to the unwind and the bridge was not aware that it needs to re-fetch it ``` DBUG[09-14|14:00:01.052] [bridge] processing new blocks from=11962480 to=11962480 lastProcessedBlockNum=11962464 lastProcessedBlockTime=1726318767 lastProcessedEventID=2899 ``` - this led to wrong trie root error when block 11962480 was executed ``` EROR[09-14|14:00:01.073] [4/6 Execution] Wrong trie root of block 11962480: 03f9ebde441d041283b85d3cbc3f8fcf240904ae8fa3d28292dd0d6b0f0766a5, expected (from header): c9b520cb589f90323ceb99980658ea7bcc0ffefef0af8d08b74f809a297e31ac. Block hash: 603c84c5e517fd3fefab90a86d9e3acf19cc426886d3c37b4e45679a1659fd6b ``` Long term solution is to handle unwinds via the Astrid components for the Bridge (events) and Heimdall (spans,checkpoints,milestones) so that they are aware of it and need to re-fetch if needed. However, one of the observations for that is that actually events, spans,checkpoints,milestones are fully finalised data that does not need to be unwound. They come from heimdall's tendermint BFT consensus which has single block finality. All the APIs that we use to scrape the data are for finalised data. The only thing that is needed to be unwound is tables that we have that rely on bor blocks (as they can unwind due to bor forking, e.g. BorEventNums, BorEventProcessedBlocks). This PR is a step in that direction, but it also fixes the above issue at chain tip (since we would not unwind unassigned events). It introduces HeimdallUnwindCfg for the Astrid stage and the Bor Heimdall stage. In the Astrid stage we configure it to not unwind events, spans, checkpoints and milestones (only BorEventNums and BorEventProcessedBlocks tables), while in Stage Bor Heimdall we leave it as is - unwinding everything. The `integration stage_polygon_sync --unwind` cmd can override the default behaviour if needed by using the `--unwindTypes` override. --- cmd/integration/commands/flags.go | 4 + cmd/integration/commands/stages.go | 5 +- eth/stagedsync/stage_bor_heimdall.go | 13 ++- eth/stagedsync/stage_polygon_sync.go | 163 ++++++++++++++++++++++----- turbo/stages/stageloop.go | 1 + 5 files changed, 151 insertions(+), 35 deletions(-) diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index d46170c21b5..324f19556e1 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -200,3 +200,7 @@ func withCommitment(cmd *cobra.Command) { cmd.Flags().StringVar(&commitmentTrie, "commitment.trie", "hex", "hex - use Hex Patricia Hashed Trie for commitments, bin - use of binary patricia trie") cmd.Flags().IntVar(&commitmentFreq, "commitment.freq", 1000000, "how many blocks to skip between calculating commitment") } + +func withUnwindTypes(cmd *cobra.Command) { + cmd.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "types to unwind for polygon sync") +} diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 975db8c642e..fa1d720a4ca 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -504,6 +504,7 @@ func init() { withDataDir(cmdStageBorHeimdall) withReset(cmdStageBorHeimdall) withUnwind(cmdStageBorHeimdall) + withUnwindTypes(cmdStageBorHeimdall) withChain(cmdStageBorHeimdall) withHeimdall(cmdStageBorHeimdall) rootCmd.AddCommand(cmdStageBorHeimdall) @@ -519,6 +520,7 @@ func init() { withDataDir(cmdStagePolygon) withReset(cmdStagePolygon) withUnwind(cmdStagePolygon) + withUnwindTypes(cmdStagePolygon) withChain(cmdStagePolygon) withHeimdall(cmdStagePolygon) rootCmd.AddCommand(cmdStagePolygon) @@ -604,7 +606,6 @@ func init() { cmdSetPrune.Flags().Uint64Var(&pruneTBefore, "prune.t.before", 0, "") cmdSetPrune.Flags().Uint64Var(&pruneCBefore, "prune.c.before", 0, "") cmdSetPrune.Flags().StringSliceVar(&experiments, "experiments", nil, "Storage mode to override database") - cmdSetPrune.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "Types to unwind for bor heimdall") rootCmd.AddCommand(cmdSetPrune) } @@ -887,7 +888,7 @@ func stagePolygonSync(db kv.RwDB, ctx context.Context, logger log.Logger) error } stageState := stage(stageSync, tx, nil, stages.PolygonSync) - cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient, nil, 0, nil, blockReader, nil, 0) // we only need blockReader and blockWriter (blockWriter is constructed in NewPolygonSyncStageCfg) + cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient, nil, 0, nil, blockReader, nil, 0, unwindTypes) if unwind > 0 { u := stageSync.NewUnwindState(stageState.ID, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false) if err := stagedsync.UnwindPolygonSyncStage(ctx, tx, u, cfg); err != nil { diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 661560e662d..57c783c8452 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -68,7 +68,7 @@ type BorHeimdallCfg struct { recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot] signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] recordWaypoints bool - unwindTypes []string + unwindCfg HeimdallUnwindCfg } func StageBorHeimdallCfg( @@ -83,13 +83,18 @@ func StageBorHeimdallCfg( recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot], signatures *lru.ARCCache[libcommon.Hash, libcommon.Address], recordWaypoints bool, - unwindTypes []string, + userUnwindTypeOverrides []string, ) BorHeimdallCfg { var borConfig *borcfg.BorConfig if chainConfig.Bor != nil { borConfig = chainConfig.Bor.(*borcfg.BorConfig) } + unwindCfg := HeimdallUnwindCfg{} // unwind everything by default + if len(userUnwindTypeOverrides) > 0 { + unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides) + } + return BorHeimdallCfg{ db: db, snapDb: snapDb, @@ -103,7 +108,7 @@ func StageBorHeimdallCfg( recents: recents, signatures: signatures, recordWaypoints: recordWaypoints, - unwindTypes: unwindTypes, + unwindCfg: unwindCfg, } } @@ -841,7 +846,7 @@ func BorHeimdallUnwind(u *UnwindState, ctx context.Context, _ *StageState, tx kv defer tx.Rollback() } - if err = UnwindHeimdall(tx, u, cfg.unwindTypes); err != nil { + if err = UnwindHeimdall(tx, u, cfg.unwindCfg); err != nil { return err } diff --git a/eth/stagedsync/stage_polygon_sync.go b/eth/stagedsync/stage_polygon_sync.go index f019b409782..81fea073548 100644 --- a/eth/stagedsync/stage_polygon_sync.go +++ b/eth/stagedsync/stage_polygon_sync.go @@ -64,6 +64,7 @@ func NewPolygonSyncStageCfg( blockReader services.FullBlockReader, stopNode func() error, blockLimit uint, + userUnwindTypeOverrides []string, ) PolygonSyncStageCfg { // using a buffered channel to preserve order of tx actions, // do not expect to ever have more than 50 goroutines blocking on this channel @@ -141,11 +142,25 @@ func NewPolygonSyncStageCfg( txActionStream: txActionStream, stopNode: stopNode, } + + unwindCfg := HeimdallUnwindCfg{ + // we keep finalized data, no point in unwinding it + KeepEvents: true, + KeepSpans: true, + KeepSpanBlockProducerSelections: true, + KeepCheckpoints: true, + KeepMilestones: true, + } + if len(userUnwindTypeOverrides) > 0 { + unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides) + } + return PolygonSyncStageCfg{ db: db, service: syncService, blockReader: blockReader, blockWriter: blockio.NewBlockWriter(), + unwindCfg: unwindCfg, } } @@ -154,6 +169,7 @@ type PolygonSyncStageCfg struct { service *polygonSyncStageService blockReader services.FullBlockReader blockWriter *blockio.BlockWriter + unwindCfg HeimdallUnwindCfg } func ForwardPolygonSyncStage( @@ -219,7 +235,7 @@ func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg } // heimdall - if err = UnwindHeimdall(tx, u, nil); err != nil { + if err = UnwindHeimdall(tx, u, cfg.unwindCfg); err != nil { return err } @@ -246,30 +262,105 @@ func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg return nil } -func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindTypes []string) error { - if len(unwindTypes) == 0 || slices.Contains(unwindTypes, "events") { +type HeimdallUnwindCfg struct { + KeepEvents bool + KeepEventNums bool + KeepEventProcessedBlocks bool + KeepSpans bool + KeepSpanBlockProducerSelections bool + KeepCheckpoints bool + KeepMilestones bool +} + +func (cfg *HeimdallUnwindCfg) ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides []string) { + if len(userUnwindTypeOverrides) > 0 { + return + } + + // If a user has specified an unwind type override it means we need to unwind all the tables that fall + // inside that type but NOT unwind the tables for the types that have not been specified in the overrides. + // Our default config value unwinds everything. + // If we initialise that and keep track of all the "unseen" unwind type overrides then we can flip our config + // to not unwind the tables for the "unseen" types. + const events = "events" + const spans = "spans" + const checkpoints = "checkpoints" + const milestones = "milestones" + unwindTypes := map[string]struct{}{ + events: {}, + spans: {}, + checkpoints: {}, + milestones: {}, + } + + for _, unwindType := range userUnwindTypeOverrides { + if _, exists := unwindTypes[unwindType]; !exists { + panic("unknown unwindType override " + unwindType) + } + + delete(unwindTypes, unwindType) + } + + // our config unwinds everything by default + defaultCfg := HeimdallUnwindCfg{} + // flip the config for the unseen type overrides + for unwindType := range unwindTypes { + switch unwindType { + case events: + defaultCfg.KeepEvents = true + defaultCfg.KeepEventNums = true + defaultCfg.KeepEventProcessedBlocks = true + case spans: + defaultCfg.KeepSpans = true + defaultCfg.KeepSpanBlockProducerSelections = true + case checkpoints: + defaultCfg.KeepCheckpoints = true + case milestones: + defaultCfg.KeepMilestones = true + default: + panic(fmt.Sprintf("missing override logic for unwindType %s, please add it", unwindType)) + } + } +} + +func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindCfg HeimdallUnwindCfg) error { + if !unwindCfg.KeepEvents { if err := UnwindEvents(tx, u.UnwindPoint); err != nil { return err } } - if len(unwindTypes) == 0 || slices.Contains(unwindTypes, "spans") { + if !unwindCfg.KeepEventNums { + if err := UnwindEventNums(tx, u.UnwindPoint); err != nil { + return err + } + } + + if !unwindCfg.KeepEventProcessedBlocks { + if err := UnwindEventProcessedBlocks(tx, u.UnwindPoint); err != nil { + return err + } + } + + if !unwindCfg.KeepSpans { if err := UnwindSpans(tx, u.UnwindPoint); err != nil { return err } + } + if !unwindCfg.KeepSpanBlockProducerSelections { if err := UnwindSpanBlockProducerSelections(tx, u.UnwindPoint); err != nil { return err } } - if borsnaptype.CheckpointsEnabled() && (len(unwindTypes) == 0 || slices.Contains(unwindTypes, "checkpoints")) { + if borsnaptype.CheckpointsEnabled() && !unwindCfg.KeepCheckpoints { if err := UnwindCheckpoints(tx, u.UnwindPoint); err != nil { return err } } - if borsnaptype.MilestonesEnabled() && (len(unwindTypes) == 0 || slices.Contains(unwindTypes, "milestones")) { + if borsnaptype.MilestonesEnabled() && !unwindCfg.KeepMilestones { if err := UnwindMilestones(tx, u.UnwindPoint); err != nil { return err } @@ -279,73 +370,87 @@ func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindTypes []string) error { } func UnwindEvents(tx kv.RwTx, unwindPoint uint64) error { - cursor, err := tx.RwCursor(kv.BorEventNums) + eventNumsCursor, err := tx.Cursor(kv.BorEventNums) if err != nil { return err } - defer cursor.Close() + defer eventNumsCursor.Close() var blockNumBuf [8]byte binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1) - _, _, err = cursor.Seek(blockNumBuf[:]) + _, _, err = eventNumsCursor.Seek(blockNumBuf[:]) if err != nil { return err } - _, prevSprintLastIDBytes, err := cursor.Prev() // last event ID of previous sprint + // keep last event ID of previous block with assigned events + _, lastEventIdToKeep, err := eventNumsCursor.Prev() if err != nil { return err } - var prevSprintLastID uint64 - if prevSprintLastIDBytes == nil { - // we are unwinding the first entry, remove all items from BorEvents - prevSprintLastID = 0 + var firstEventIdToRemove uint64 + if lastEventIdToKeep == nil { + // there are no assigned events before the unwind block, remove all items from BorEvents + firstEventIdToRemove = 0 } else { - prevSprintLastID = binary.BigEndian.Uint64(prevSprintLastIDBytes) + firstEventIdToRemove = binary.BigEndian.Uint64(lastEventIdToKeep) + 1 } - eventId := make([]byte, 8) // first event ID for this sprint - binary.BigEndian.PutUint64(eventId, prevSprintLastID+1) - + from := make([]byte, 8) + binary.BigEndian.PutUint64(from, firstEventIdToRemove) eventCursor, err := tx.RwCursor(kv.BorEvents) if err != nil { return err } defer eventCursor.Close() - for eventId, _, err = eventCursor.Seek(eventId); err == nil && eventId != nil; eventId, _, err = eventCursor.Next() { + var k []byte + for k, _, err = eventCursor.Seek(from); err == nil && k != nil; k, _, err = eventCursor.Next() { if err = eventCursor.DeleteCurrent(); err != nil { return err } } - if err != nil { - return err - } - k, _, err := cursor.Next() // move cursor back to this sprint + return err +} + +func UnwindEventNums(tx kv.RwTx, unwindPoint uint64) error { + c, err := tx.RwCursor(kv.BorEventNums) if err != nil { return err } - for ; err == nil && k != nil; k, _, err = cursor.Next() { - if err = cursor.DeleteCurrent(); err != nil { + defer c.Close() + var blockNumBuf [8]byte + binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1) + var k []byte + for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() { + if err = c.DeleteCurrent(); err != nil { return err } } - epbCursor, err := tx.RwCursor(kv.BorEventProcessedBlocks) + return err +} + +func UnwindEventProcessedBlocks(tx kv.RwTx, unwindPoint uint64) error { + c, err := tx.RwCursor(kv.BorEventProcessedBlocks) if err != nil { return err } - defer epbCursor.Close() - for k, _, err = epbCursor.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = epbCursor.Next() { - if err = epbCursor.DeleteCurrent(); err != nil { + defer c.Close() + var blockNumBuf [8]byte + binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1) + var k []byte + for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() { + if err = c.DeleteCurrent(); err != nil { return err } } + return err } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 0e0d3c1f0f8..52bc1f1c571 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -765,6 +765,7 @@ func NewPolygonSyncStages( blockReader, stopNode, config.LoopBlockLimit, + nil, /* userUnwindTypeOverrides */ ), stagedsync.StageSendersCfg(db, chainConfig, config.Sync, false, config.Dirs.Tmp, config.Prune, blockReader, nil), stagedsync.StageExecuteBlocksCfg(db, config.Prune, config.BatchSize, chainConfig, consensusEngine, &vm.Config{}, notifications, config.StateStream, false, false, config.Dirs, blockReader, nil, config.Genesis, config.Sync, SilkwormForExecutionStage(silkworm, config)),