diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index d46170c21b5..324f19556e1 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -200,3 +200,7 @@ func withCommitment(cmd *cobra.Command) { cmd.Flags().StringVar(&commitmentTrie, "commitment.trie", "hex", "hex - use Hex Patricia Hashed Trie for commitments, bin - use of binary patricia trie") cmd.Flags().IntVar(&commitmentFreq, "commitment.freq", 1000000, "how many blocks to skip between calculating commitment") } + +func withUnwindTypes(cmd *cobra.Command) { + cmd.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "types to unwind for polygon sync") +} diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 975db8c642e..fa1d720a4ca 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -504,6 +504,7 @@ func init() { withDataDir(cmdStageBorHeimdall) withReset(cmdStageBorHeimdall) withUnwind(cmdStageBorHeimdall) + withUnwindTypes(cmdStageBorHeimdall) withChain(cmdStageBorHeimdall) withHeimdall(cmdStageBorHeimdall) rootCmd.AddCommand(cmdStageBorHeimdall) @@ -519,6 +520,7 @@ func init() { withDataDir(cmdStagePolygon) withReset(cmdStagePolygon) withUnwind(cmdStagePolygon) + withUnwindTypes(cmdStagePolygon) withChain(cmdStagePolygon) withHeimdall(cmdStagePolygon) rootCmd.AddCommand(cmdStagePolygon) @@ -604,7 +606,6 @@ func init() { cmdSetPrune.Flags().Uint64Var(&pruneTBefore, "prune.t.before", 0, "") cmdSetPrune.Flags().Uint64Var(&pruneCBefore, "prune.c.before", 0, "") cmdSetPrune.Flags().StringSliceVar(&experiments, "experiments", nil, "Storage mode to override database") - cmdSetPrune.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "Types to unwind for bor heimdall") rootCmd.AddCommand(cmdSetPrune) } @@ -887,7 +888,7 @@ func stagePolygonSync(db kv.RwDB, ctx context.Context, logger log.Logger) error } stageState := stage(stageSync, tx, nil, stages.PolygonSync) - cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient, nil, 0, nil, blockReader, nil, 0) // we only need blockReader and blockWriter (blockWriter is constructed in NewPolygonSyncStageCfg) + cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient, nil, 0, nil, blockReader, nil, 0, unwindTypes) if unwind > 0 { u := stageSync.NewUnwindState(stageState.ID, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false) if err := stagedsync.UnwindPolygonSyncStage(ctx, tx, u, cfg); err != nil { diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 661560e662d..57c783c8452 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -68,7 +68,7 @@ type BorHeimdallCfg struct { recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot] signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] recordWaypoints bool - unwindTypes []string + unwindCfg HeimdallUnwindCfg } func StageBorHeimdallCfg( @@ -83,13 +83,18 @@ func StageBorHeimdallCfg( recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot], signatures *lru.ARCCache[libcommon.Hash, libcommon.Address], recordWaypoints bool, - unwindTypes []string, + userUnwindTypeOverrides []string, ) BorHeimdallCfg { var borConfig *borcfg.BorConfig if chainConfig.Bor != nil { borConfig = chainConfig.Bor.(*borcfg.BorConfig) } + unwindCfg := HeimdallUnwindCfg{} // unwind everything by default + if len(userUnwindTypeOverrides) > 0 { + unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides) + } + return BorHeimdallCfg{ db: db, snapDb: snapDb, @@ -103,7 +108,7 @@ func StageBorHeimdallCfg( recents: recents, signatures: signatures, recordWaypoints: recordWaypoints, - unwindTypes: unwindTypes, + unwindCfg: unwindCfg, } } @@ -841,7 +846,7 @@ func BorHeimdallUnwind(u *UnwindState, ctx context.Context, _ *StageState, tx kv defer tx.Rollback() } - if err = UnwindHeimdall(tx, u, cfg.unwindTypes); err != nil { + if err = UnwindHeimdall(tx, u, cfg.unwindCfg); err != nil { return err } diff --git a/eth/stagedsync/stage_polygon_sync.go b/eth/stagedsync/stage_polygon_sync.go index f019b409782..81fea073548 100644 --- a/eth/stagedsync/stage_polygon_sync.go +++ b/eth/stagedsync/stage_polygon_sync.go @@ -64,6 +64,7 @@ func NewPolygonSyncStageCfg( blockReader services.FullBlockReader, stopNode func() error, blockLimit uint, + userUnwindTypeOverrides []string, ) PolygonSyncStageCfg { // using a buffered channel to preserve order of tx actions, // do not expect to ever have more than 50 goroutines blocking on this channel @@ -141,11 +142,25 @@ func NewPolygonSyncStageCfg( txActionStream: txActionStream, stopNode: stopNode, } + + unwindCfg := HeimdallUnwindCfg{ + // we keep finalized data, no point in unwinding it + KeepEvents: true, + KeepSpans: true, + KeepSpanBlockProducerSelections: true, + KeepCheckpoints: true, + KeepMilestones: true, + } + if len(userUnwindTypeOverrides) > 0 { + unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides) + } + return PolygonSyncStageCfg{ db: db, service: syncService, blockReader: blockReader, blockWriter: blockio.NewBlockWriter(), + unwindCfg: unwindCfg, } } @@ -154,6 +169,7 @@ type PolygonSyncStageCfg struct { service *polygonSyncStageService blockReader services.FullBlockReader blockWriter *blockio.BlockWriter + unwindCfg HeimdallUnwindCfg } func ForwardPolygonSyncStage( @@ -219,7 +235,7 @@ func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg } // heimdall - if err = UnwindHeimdall(tx, u, nil); err != nil { + if err = UnwindHeimdall(tx, u, cfg.unwindCfg); err != nil { return err } @@ -246,30 +262,105 @@ func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg return nil } -func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindTypes []string) error { - if len(unwindTypes) == 0 || slices.Contains(unwindTypes, "events") { +type HeimdallUnwindCfg struct { + KeepEvents bool + KeepEventNums bool + KeepEventProcessedBlocks bool + KeepSpans bool + KeepSpanBlockProducerSelections bool + KeepCheckpoints bool + KeepMilestones bool +} + +func (cfg *HeimdallUnwindCfg) ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides []string) { + if len(userUnwindTypeOverrides) > 0 { + return + } + + // If a user has specified an unwind type override it means we need to unwind all the tables that fall + // inside that type but NOT unwind the tables for the types that have not been specified in the overrides. + // Our default config value unwinds everything. + // If we initialise that and keep track of all the "unseen" unwind type overrides then we can flip our config + // to not unwind the tables for the "unseen" types. + const events = "events" + const spans = "spans" + const checkpoints = "checkpoints" + const milestones = "milestones" + unwindTypes := map[string]struct{}{ + events: {}, + spans: {}, + checkpoints: {}, + milestones: {}, + } + + for _, unwindType := range userUnwindTypeOverrides { + if _, exists := unwindTypes[unwindType]; !exists { + panic("unknown unwindType override " + unwindType) + } + + delete(unwindTypes, unwindType) + } + + // our config unwinds everything by default + defaultCfg := HeimdallUnwindCfg{} + // flip the config for the unseen type overrides + for unwindType := range unwindTypes { + switch unwindType { + case events: + defaultCfg.KeepEvents = true + defaultCfg.KeepEventNums = true + defaultCfg.KeepEventProcessedBlocks = true + case spans: + defaultCfg.KeepSpans = true + defaultCfg.KeepSpanBlockProducerSelections = true + case checkpoints: + defaultCfg.KeepCheckpoints = true + case milestones: + defaultCfg.KeepMilestones = true + default: + panic(fmt.Sprintf("missing override logic for unwindType %s, please add it", unwindType)) + } + } +} + +func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindCfg HeimdallUnwindCfg) error { + if !unwindCfg.KeepEvents { if err := UnwindEvents(tx, u.UnwindPoint); err != nil { return err } } - if len(unwindTypes) == 0 || slices.Contains(unwindTypes, "spans") { + if !unwindCfg.KeepEventNums { + if err := UnwindEventNums(tx, u.UnwindPoint); err != nil { + return err + } + } + + if !unwindCfg.KeepEventProcessedBlocks { + if err := UnwindEventProcessedBlocks(tx, u.UnwindPoint); err != nil { + return err + } + } + + if !unwindCfg.KeepSpans { if err := UnwindSpans(tx, u.UnwindPoint); err != nil { return err } + } + if !unwindCfg.KeepSpanBlockProducerSelections { if err := UnwindSpanBlockProducerSelections(tx, u.UnwindPoint); err != nil { return err } } - if borsnaptype.CheckpointsEnabled() && (len(unwindTypes) == 0 || slices.Contains(unwindTypes, "checkpoints")) { + if borsnaptype.CheckpointsEnabled() && !unwindCfg.KeepCheckpoints { if err := UnwindCheckpoints(tx, u.UnwindPoint); err != nil { return err } } - if borsnaptype.MilestonesEnabled() && (len(unwindTypes) == 0 || slices.Contains(unwindTypes, "milestones")) { + if borsnaptype.MilestonesEnabled() && !unwindCfg.KeepMilestones { if err := UnwindMilestones(tx, u.UnwindPoint); err != nil { return err } @@ -279,73 +370,87 @@ func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindTypes []string) error { } func UnwindEvents(tx kv.RwTx, unwindPoint uint64) error { - cursor, err := tx.RwCursor(kv.BorEventNums) + eventNumsCursor, err := tx.Cursor(kv.BorEventNums) if err != nil { return err } - defer cursor.Close() + defer eventNumsCursor.Close() var blockNumBuf [8]byte binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1) - _, _, err = cursor.Seek(blockNumBuf[:]) + _, _, err = eventNumsCursor.Seek(blockNumBuf[:]) if err != nil { return err } - _, prevSprintLastIDBytes, err := cursor.Prev() // last event ID of previous sprint + // keep last event ID of previous block with assigned events + _, lastEventIdToKeep, err := eventNumsCursor.Prev() if err != nil { return err } - var prevSprintLastID uint64 - if prevSprintLastIDBytes == nil { - // we are unwinding the first entry, remove all items from BorEvents - prevSprintLastID = 0 + var firstEventIdToRemove uint64 + if lastEventIdToKeep == nil { + // there are no assigned events before the unwind block, remove all items from BorEvents + firstEventIdToRemove = 0 } else { - prevSprintLastID = binary.BigEndian.Uint64(prevSprintLastIDBytes) + firstEventIdToRemove = binary.BigEndian.Uint64(lastEventIdToKeep) + 1 } - eventId := make([]byte, 8) // first event ID for this sprint - binary.BigEndian.PutUint64(eventId, prevSprintLastID+1) - + from := make([]byte, 8) + binary.BigEndian.PutUint64(from, firstEventIdToRemove) eventCursor, err := tx.RwCursor(kv.BorEvents) if err != nil { return err } defer eventCursor.Close() - for eventId, _, err = eventCursor.Seek(eventId); err == nil && eventId != nil; eventId, _, err = eventCursor.Next() { + var k []byte + for k, _, err = eventCursor.Seek(from); err == nil && k != nil; k, _, err = eventCursor.Next() { if err = eventCursor.DeleteCurrent(); err != nil { return err } } - if err != nil { - return err - } - k, _, err := cursor.Next() // move cursor back to this sprint + return err +} + +func UnwindEventNums(tx kv.RwTx, unwindPoint uint64) error { + c, err := tx.RwCursor(kv.BorEventNums) if err != nil { return err } - for ; err == nil && k != nil; k, _, err = cursor.Next() { - if err = cursor.DeleteCurrent(); err != nil { + defer c.Close() + var blockNumBuf [8]byte + binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1) + var k []byte + for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() { + if err = c.DeleteCurrent(); err != nil { return err } } - epbCursor, err := tx.RwCursor(kv.BorEventProcessedBlocks) + return err +} + +func UnwindEventProcessedBlocks(tx kv.RwTx, unwindPoint uint64) error { + c, err := tx.RwCursor(kv.BorEventProcessedBlocks) if err != nil { return err } - defer epbCursor.Close() - for k, _, err = epbCursor.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = epbCursor.Next() { - if err = epbCursor.DeleteCurrent(); err != nil { + defer c.Close() + var blockNumBuf [8]byte + binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1) + var k []byte + for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() { + if err = c.DeleteCurrent(); err != nil { return err } } + return err } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 0e0d3c1f0f8..52bc1f1c571 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -765,6 +765,7 @@ func NewPolygonSyncStages( blockReader, stopNode, config.LoopBlockLimit, + nil, /* userUnwindTypeOverrides */ ), stagedsync.StageSendersCfg(db, chainConfig, config.Sync, false, config.Dirs.Tmp, config.Prune, blockReader, nil), stagedsync.StageExecuteBlocksCfg(db, config.Prune, config.BatchSize, chainConfig, consensusEngine, &vm.Config{}, notifications, config.StateStream, false, false, config.Dirs, blockReader, nil, config.Genesis, config.Sync, SilkwormForExecutionStage(silkworm, config)),