Skip to content

Commit

Permalink
astrid stage: do not unwind finalized data (#11999)
Browse files Browse the repository at this point in the history
relates to: #11533

but mainly fixes an issue at tip that appeared in a run of astrid stage
integration:
- we fetched event 2900 as soon as it appeared - this event was meant to
be assigned to block 11962480
```
DBUG[09-14|13:57:41.878] [bridge] fetched new events periodic progress count=1 lastFetchedEventID=2900 lastFetchedEventTime=2024-09-14T12:57:35Z
```
- we then had a change of forks 
```
INFO[09-14|13:58:31.961] [2/6 PolygonSync] new fork - unwinding and caching fork choice unwindNumber=11962436 badHash=0x7e377810370189ece829b48172cc464672393678142cf105aad9212736893b8d cachedTipNumber=11962437 cachedTipHash=0x5076be24d339b5d75b52659dc746d3ceaa3d8dd873f3c34e5655849787d03d2b cachedNewNodes=1
DBUG[09-14|13:58:31.961] UnwindTo                                 block=11962436 block_hash=0x7e377810370189ece829b48172cc464672393678142cf105aad9212736893b8d err=nil stack="[sync.go:171 stage_polygon_sync.go:1437 stage_polygon_sync.go:1390 stage_polygon_sync.go:1609 stage_polygon_sync.go:497 stage_polygon_sync.go:176 default_stages.go:479 sync.go:531 sync.go:410 stageloop.go:249 stageloop.go:101 asm_arm64.s:1222]"
```
- when that happened we unwound the stages and deleted event 2900 (below
is from a debug print statement which says - unwind everything >
prevSprintLastID)
```
UNWIND prevSprintLastID:2899
```
- then when the bridge bridge processed block 11962480, the event was
not in the DB as it was deleted due to the unwind and the bridge was not
aware that it needs to re-fetch it
```
DBUG[09-14|14:00:01.052] [bridge] processing new blocks           from=11962480 to=11962480 lastProcessedBlockNum=11962464 lastProcessedBlockTime=1726318767 lastProcessedEventID=2899
```
- this led to wrong trie root error when block 11962480 was executed
```
EROR[09-14|14:00:01.073] [4/6 Execution] Wrong trie root of block 11962480: 03f9ebde441d041283b85d3cbc3f8fcf240904ae8fa3d28292dd0d6b0f0766a5, expected (from header): c9b520cb589f90323ceb99980658ea7bcc0ffefef0af8d08b74f809a297e31ac. Block hash: 603c84c5e517fd3fefab90a86d9e3acf19cc426886d3c37b4e45679a1659fd6b 
```

Long term solution is to handle unwinds via the Astrid components for
the Bridge (events) and Heimdall (spans,checkpoints,milestones) so that
they are aware of it and need to re-fetch if needed.

However, one of the observations for that is that actually events,
spans,checkpoints,milestones are fully finalised data that does not need
to be unwound. They come from heimdall's tendermint BFT consensus which
has single block finality. All the APIs that we use to scrape the data
are for finalised data. The only thing that is needed to be unwound is
tables that we have that rely on bor blocks (as they can unwind due to
bor forking, e.g. BorEventNums, BorEventProcessedBlocks).

This PR is a step in that direction, but it also fixes the above issue
at chain tip (since we would not unwind unassigned events). It
introduces HeimdallUnwindCfg for the Astrid stage and the Bor Heimdall
stage. In the Astrid stage we configure it to not unwind events, spans,
checkpoints and milestones (only BorEventNums and
BorEventProcessedBlocks tables), while in Stage Bor Heimdall we leave it
as is - unwinding everything. The `integration stage_polygon_sync
--unwind` cmd can override the default behaviour if needed by using the
`--unwindTypes` override.
  • Loading branch information
taratorio authored Sep 16, 2024
1 parent 046aa8b commit 789e568
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 35 deletions.
4 changes: 4 additions & 0 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,7 @@ func withCommitment(cmd *cobra.Command) {
cmd.Flags().StringVar(&commitmentTrie, "commitment.trie", "hex", "hex - use Hex Patricia Hashed Trie for commitments, bin - use of binary patricia trie")
cmd.Flags().IntVar(&commitmentFreq, "commitment.freq", 1000000, "how many blocks to skip between calculating commitment")
}

func withUnwindTypes(cmd *cobra.Command) {
cmd.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "types to unwind for polygon sync")
}
5 changes: 3 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func init() {
withDataDir(cmdStageBorHeimdall)
withReset(cmdStageBorHeimdall)
withUnwind(cmdStageBorHeimdall)
withUnwindTypes(cmdStageBorHeimdall)
withChain(cmdStageBorHeimdall)
withHeimdall(cmdStageBorHeimdall)
rootCmd.AddCommand(cmdStageBorHeimdall)
Expand All @@ -519,6 +520,7 @@ func init() {
withDataDir(cmdStagePolygon)
withReset(cmdStagePolygon)
withUnwind(cmdStagePolygon)
withUnwindTypes(cmdStagePolygon)
withChain(cmdStagePolygon)
withHeimdall(cmdStagePolygon)
rootCmd.AddCommand(cmdStagePolygon)
Expand Down Expand Up @@ -604,7 +606,6 @@ func init() {
cmdSetPrune.Flags().Uint64Var(&pruneTBefore, "prune.t.before", 0, "")
cmdSetPrune.Flags().Uint64Var(&pruneCBefore, "prune.c.before", 0, "")
cmdSetPrune.Flags().StringSliceVar(&experiments, "experiments", nil, "Storage mode to override database")
cmdSetPrune.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "Types to unwind for bor heimdall")
rootCmd.AddCommand(cmdSetPrune)
}

Expand Down Expand Up @@ -887,7 +888,7 @@ func stagePolygonSync(db kv.RwDB, ctx context.Context, logger log.Logger) error
}

stageState := stage(stageSync, tx, nil, stages.PolygonSync)
cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient, nil, 0, nil, blockReader, nil, 0) // we only need blockReader and blockWriter (blockWriter is constructed in NewPolygonSyncStageCfg)
cfg := stagedsync.NewPolygonSyncStageCfg(logger, chainConfig, nil, heimdallClient, nil, 0, nil, blockReader, nil, 0, unwindTypes)
if unwind > 0 {
u := stageSync.NewUnwindState(stageState.ID, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
if err := stagedsync.UnwindPolygonSyncStage(ctx, tx, u, cfg); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type BorHeimdallCfg struct {
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot]
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address]
recordWaypoints bool
unwindTypes []string
unwindCfg HeimdallUnwindCfg
}

func StageBorHeimdallCfg(
Expand All @@ -83,13 +83,18 @@ func StageBorHeimdallCfg(
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
recordWaypoints bool,
unwindTypes []string,
userUnwindTypeOverrides []string,
) BorHeimdallCfg {
var borConfig *borcfg.BorConfig
if chainConfig.Bor != nil {
borConfig = chainConfig.Bor.(*borcfg.BorConfig)
}

unwindCfg := HeimdallUnwindCfg{} // unwind everything by default
if len(userUnwindTypeOverrides) > 0 {
unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides)
}

return BorHeimdallCfg{
db: db,
snapDb: snapDb,
Expand All @@ -103,7 +108,7 @@ func StageBorHeimdallCfg(
recents: recents,
signatures: signatures,
recordWaypoints: recordWaypoints,
unwindTypes: unwindTypes,
unwindCfg: unwindCfg,
}
}

Expand Down Expand Up @@ -841,7 +846,7 @@ func BorHeimdallUnwind(u *UnwindState, ctx context.Context, _ *StageState, tx kv
defer tx.Rollback()
}

if err = UnwindHeimdall(tx, u, cfg.unwindTypes); err != nil {
if err = UnwindHeimdall(tx, u, cfg.unwindCfg); err != nil {
return err
}

Expand Down
163 changes: 134 additions & 29 deletions eth/stagedsync/stage_polygon_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewPolygonSyncStageCfg(
blockReader services.FullBlockReader,
stopNode func() error,
blockLimit uint,
userUnwindTypeOverrides []string,
) PolygonSyncStageCfg {
// using a buffered channel to preserve order of tx actions,
// do not expect to ever have more than 50 goroutines blocking on this channel
Expand Down Expand Up @@ -141,11 +142,25 @@ func NewPolygonSyncStageCfg(
txActionStream: txActionStream,
stopNode: stopNode,
}

unwindCfg := HeimdallUnwindCfg{
// we keep finalized data, no point in unwinding it
KeepEvents: true,
KeepSpans: true,
KeepSpanBlockProducerSelections: true,
KeepCheckpoints: true,
KeepMilestones: true,
}
if len(userUnwindTypeOverrides) > 0 {
unwindCfg.ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides)
}

return PolygonSyncStageCfg{
db: db,
service: syncService,
blockReader: blockReader,
blockWriter: blockio.NewBlockWriter(),
unwindCfg: unwindCfg,
}
}

Expand All @@ -154,6 +169,7 @@ type PolygonSyncStageCfg struct {
service *polygonSyncStageService
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
unwindCfg HeimdallUnwindCfg
}

func ForwardPolygonSyncStage(
Expand Down Expand Up @@ -219,7 +235,7 @@ func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg
}

// heimdall
if err = UnwindHeimdall(tx, u, nil); err != nil {
if err = UnwindHeimdall(tx, u, cfg.unwindCfg); err != nil {
return err
}

Expand All @@ -246,30 +262,105 @@ func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg
return nil
}

func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindTypes []string) error {
if len(unwindTypes) == 0 || slices.Contains(unwindTypes, "events") {
type HeimdallUnwindCfg struct {
KeepEvents bool
KeepEventNums bool
KeepEventProcessedBlocks bool
KeepSpans bool
KeepSpanBlockProducerSelections bool
KeepCheckpoints bool
KeepMilestones bool
}

func (cfg *HeimdallUnwindCfg) ApplyUserUnwindTypeOverrides(userUnwindTypeOverrides []string) {
if len(userUnwindTypeOverrides) > 0 {
return
}

// If a user has specified an unwind type override it means we need to unwind all the tables that fall
// inside that type but NOT unwind the tables for the types that have not been specified in the overrides.
// Our default config value unwinds everything.
// If we initialise that and keep track of all the "unseen" unwind type overrides then we can flip our config
// to not unwind the tables for the "unseen" types.
const events = "events"
const spans = "spans"
const checkpoints = "checkpoints"
const milestones = "milestones"
unwindTypes := map[string]struct{}{
events: {},
spans: {},
checkpoints: {},
milestones: {},
}

for _, unwindType := range userUnwindTypeOverrides {
if _, exists := unwindTypes[unwindType]; !exists {
panic("unknown unwindType override " + unwindType)
}

delete(unwindTypes, unwindType)
}

// our config unwinds everything by default
defaultCfg := HeimdallUnwindCfg{}
// flip the config for the unseen type overrides
for unwindType := range unwindTypes {
switch unwindType {
case events:
defaultCfg.KeepEvents = true
defaultCfg.KeepEventNums = true
defaultCfg.KeepEventProcessedBlocks = true
case spans:
defaultCfg.KeepSpans = true
defaultCfg.KeepSpanBlockProducerSelections = true
case checkpoints:
defaultCfg.KeepCheckpoints = true
case milestones:
defaultCfg.KeepMilestones = true
default:
panic(fmt.Sprintf("missing override logic for unwindType %s, please add it", unwindType))
}
}
}

func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindCfg HeimdallUnwindCfg) error {
if !unwindCfg.KeepEvents {
if err := UnwindEvents(tx, u.UnwindPoint); err != nil {
return err
}
}

if len(unwindTypes) == 0 || slices.Contains(unwindTypes, "spans") {
if !unwindCfg.KeepEventNums {
if err := UnwindEventNums(tx, u.UnwindPoint); err != nil {
return err
}
}

if !unwindCfg.KeepEventProcessedBlocks {
if err := UnwindEventProcessedBlocks(tx, u.UnwindPoint); err != nil {
return err
}
}

if !unwindCfg.KeepSpans {
if err := UnwindSpans(tx, u.UnwindPoint); err != nil {
return err
}
}

if !unwindCfg.KeepSpanBlockProducerSelections {
if err := UnwindSpanBlockProducerSelections(tx, u.UnwindPoint); err != nil {
return err
}
}

if borsnaptype.CheckpointsEnabled() && (len(unwindTypes) == 0 || slices.Contains(unwindTypes, "checkpoints")) {
if borsnaptype.CheckpointsEnabled() && !unwindCfg.KeepCheckpoints {
if err := UnwindCheckpoints(tx, u.UnwindPoint); err != nil {
return err
}
}

if borsnaptype.MilestonesEnabled() && (len(unwindTypes) == 0 || slices.Contains(unwindTypes, "milestones")) {
if borsnaptype.MilestonesEnabled() && !unwindCfg.KeepMilestones {
if err := UnwindMilestones(tx, u.UnwindPoint); err != nil {
return err
}
Expand All @@ -279,73 +370,87 @@ func UnwindHeimdall(tx kv.RwTx, u *UnwindState, unwindTypes []string) error {
}

func UnwindEvents(tx kv.RwTx, unwindPoint uint64) error {
cursor, err := tx.RwCursor(kv.BorEventNums)
eventNumsCursor, err := tx.Cursor(kv.BorEventNums)
if err != nil {
return err
}
defer cursor.Close()
defer eventNumsCursor.Close()

var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1)

_, _, err = cursor.Seek(blockNumBuf[:])
_, _, err = eventNumsCursor.Seek(blockNumBuf[:])
if err != nil {
return err
}

_, prevSprintLastIDBytes, err := cursor.Prev() // last event ID of previous sprint
// keep last event ID of previous block with assigned events
_, lastEventIdToKeep, err := eventNumsCursor.Prev()
if err != nil {
return err
}

var prevSprintLastID uint64
if prevSprintLastIDBytes == nil {
// we are unwinding the first entry, remove all items from BorEvents
prevSprintLastID = 0
var firstEventIdToRemove uint64
if lastEventIdToKeep == nil {
// there are no assigned events before the unwind block, remove all items from BorEvents
firstEventIdToRemove = 0
} else {
prevSprintLastID = binary.BigEndian.Uint64(prevSprintLastIDBytes)
firstEventIdToRemove = binary.BigEndian.Uint64(lastEventIdToKeep) + 1
}

eventId := make([]byte, 8) // first event ID for this sprint
binary.BigEndian.PutUint64(eventId, prevSprintLastID+1)

from := make([]byte, 8)
binary.BigEndian.PutUint64(from, firstEventIdToRemove)
eventCursor, err := tx.RwCursor(kv.BorEvents)
if err != nil {
return err
}
defer eventCursor.Close()

for eventId, _, err = eventCursor.Seek(eventId); err == nil && eventId != nil; eventId, _, err = eventCursor.Next() {
var k []byte
for k, _, err = eventCursor.Seek(from); err == nil && k != nil; k, _, err = eventCursor.Next() {
if err = eventCursor.DeleteCurrent(); err != nil {
return err
}
}
if err != nil {
return err
}

k, _, err := cursor.Next() // move cursor back to this sprint
return err
}

func UnwindEventNums(tx kv.RwTx, unwindPoint uint64) error {
c, err := tx.RwCursor(kv.BorEventNums)
if err != nil {
return err
}

for ; err == nil && k != nil; k, _, err = cursor.Next() {
if err = cursor.DeleteCurrent(); err != nil {
defer c.Close()
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1)
var k []byte
for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() {
if err = c.DeleteCurrent(); err != nil {
return err
}
}

epbCursor, err := tx.RwCursor(kv.BorEventProcessedBlocks)
return err
}

func UnwindEventProcessedBlocks(tx kv.RwTx, unwindPoint uint64) error {
c, err := tx.RwCursor(kv.BorEventProcessedBlocks)
if err != nil {
return err
}

defer epbCursor.Close()
for k, _, err = epbCursor.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = epbCursor.Next() {
if err = epbCursor.DeleteCurrent(); err != nil {
defer c.Close()
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], unwindPoint+1)
var k []byte
for k, _, err = c.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = c.Next() {
if err = c.DeleteCurrent(); err != nil {
return err
}
}

return err
}

Expand Down
1 change: 1 addition & 0 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ func NewPolygonSyncStages(
blockReader,
stopNode,
config.LoopBlockLimit,
nil, /* userUnwindTypeOverrides */
),
stagedsync.StageSendersCfg(db, chainConfig, config.Sync, false, config.Dirs.Tmp, config.Prune, blockReader, nil),
stagedsync.StageExecuteBlocksCfg(db, config.Prune, config.BatchSize, chainConfig, consensusEngine, &vm.Config{}, notifications, config.StateStream, false, false, config.Dirs, blockReader, nil, config.Genesis, config.Sync, SilkwormForExecutionStage(silkworm, config)),
Expand Down

0 comments on commit 789e568

Please sign in to comment.