Skip to content

Commit

Permalink
drop lastBlockNumber/Hash from syncer
Browse files Browse the repository at this point in the history
Syncer works at the same time with consensus and block generation.
So it's possible that we get new blocks via a network that was previously
committed via consensus.

In future we can cache it somehow but it should be done in separate module
that keeps fresh blockchain state.
  • Loading branch information
olegrok committed Jan 30, 2025
1 parent 522cafd commit 07860a7
Showing 1 changed file with 31 additions and 34 deletions.
65 changes: 31 additions & 34 deletions nil/internal/collate/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -46,9 +45,6 @@ type Syncer struct {

logger zerolog.Logger

lastBlockNumber types.BlockNumber
lastBlockHash common.Hash

validatorPublicKey []byte
waitForSync *sync.WaitGroup
}
Expand All @@ -69,41 +65,34 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy
logger: logging.NewLogger("sync").With().
Stringer(logging.FieldShardId, cfg.ShardId).
Logger(),
lastBlockNumber: types.BlockNumber(math.MaxUint64),
validatorPublicKey: crypto.CompressPubkey(cfg.ValidatorPublicKey),
waitForSync: &waitForSync,
}, nil
}

func (s *Syncer) readLastBlockNumber(ctx context.Context) error {
func (s *Syncer) readLastBlock(ctx context.Context) (*types.Block, common.Hash, error) {
rotx, err := s.db.CreateRoTx(ctx)
if err != nil {
return err
return nil, common.EmptyHash, err
}
defer rotx.Rollback()
lastBlock, lastBlockHash, err := db.ReadLastBlock(rotx, s.config.ShardId)

block, hash, err := db.ReadLastBlock(rotx, s.config.ShardId)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return err
return nil, common.EmptyHash, err
}
if err == nil {
s.lastBlockNumber = lastBlock.Id
s.lastBlockHash = lastBlockHash
return block, hash, err
}
return nil
return nil, common.EmptyHash, nil
}

func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) {
rotx, err := s.db.CreateRoTx(ctx)
block, _, err := s.readLastBlock(ctx)
if err != nil {
return false, err
}
defer rotx.Rollback()

_, err = db.ReadLastBlockHash(rotx, s.config.ShardId)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return false, err
}
return err != nil, nil
return block == nil, nil
}

func (s *Syncer) WaitComplete() {
Expand Down Expand Up @@ -135,15 +124,15 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
return nil
}

err := s.readLastBlockNumber(ctx)
block, hash, err := s.readLastBlock(ctx)
if err != nil {
return fmt.Errorf("failed to read last block number: %w", err)
}

s.logger.Debug().
Stringer(logging.FieldBlockHash, s.lastBlockHash).
Uint64(logging.FieldBlockNumber, s.lastBlockNumber.Uint64()).
Msgf("Initialized sync collator at starting block")
Stringer(logging.FieldBlockHash, hash).
Uint64(logging.FieldBlockNumber, uint64(block.Id)).
Msg("Initialized sync collator at starting block")

s.logger.Info().Msg("Starting sync")

Expand Down Expand Up @@ -198,17 +187,22 @@ func (s *Syncer) processTopicTransaction(ctx context.Context, data []byte) (bool
Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)).
Msg("Received block")

if block.Id != s.lastBlockNumber+1 {
lastBlock, lastHash, err := s.readLastBlock(ctx)
if err != nil {
return false, err
}

if block.Id != lastBlock.Id+1 {
s.logger.Debug().
Stringer(logging.FieldBlockNumber, block.Id).
Msgf("Received block is out of order with the last block %d", s.lastBlockNumber)
Msgf("Received block is out of order with the last block %d", lastBlock.Id)

// todo: queue the block for later processing
return false, nil
}

if block.PrevBlock != s.lastBlockHash {
txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", s.lastBlockHash, block.PrevBlock)
if block.PrevBlock != lastHash {
txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", lastHash, block.PrevBlock)
s.logger.Error().
Stringer(logging.FieldBlockNumber, block.Id).
Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)).
Expand Down Expand Up @@ -250,11 +244,16 @@ func (s *Syncer) fetchBlocksRange(ctx context.Context) []*types.BlockWithExtract

s.logger.Debug().Msgf("Found %d peers to fetch block from:\n%v", len(peers), peers)

lastBlock, _, err := s.readLastBlock(ctx)
if err != nil {
return nil
}

for _, p := range peers {
s.logger.Debug().Msgf("Requesting block %d from peer %s", s.lastBlockNumber+1, p)
s.logger.Debug().Msgf("Requesting block %d from peer %s", lastBlock.Id+1, p)

const count = 100
blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, s.lastBlockNumber+1, count)
blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, lastBlock.Id+1, count)
if err == nil {
return blocks
}
Expand Down Expand Up @@ -304,11 +303,9 @@ func (s *Syncer) saveBlocks(ctx context.Context, blocks []*types.BlockWithExtrac
}
}

s.lastBlockNumber = blocks[len(blocks)-1].Block.Id
s.lastBlockHash = blocks[len(blocks)-1].Block.Hash(s.config.ShardId)

lastBlockNumber := blocks[len(blocks)-1].Block.Id
s.logger.Debug().
Stringer(logging.FieldBlockNumber, s.lastBlockNumber).
Stringer(logging.FieldBlockNumber, lastBlockNumber).
Msg("Blocks written")

return nil
Expand Down

0 comments on commit 07860a7

Please sign in to comment.