diff --git a/nil/internal/collate/syncer.go b/nil/internal/collate/syncer.go index 0a61345f..df13248b 100644 --- a/nil/internal/collate/syncer.go +++ b/nil/internal/collate/syncer.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "math" "slices" "sync" "time" @@ -46,9 +45,6 @@ type Syncer struct { logger zerolog.Logger - lastBlockNumber types.BlockNumber - lastBlockHash common.Hash - validatorPublicKey []byte waitForSync *sync.WaitGroup } @@ -69,41 +65,34 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy logger: logging.NewLogger("sync").With(). Stringer(logging.FieldShardId, cfg.ShardId). Logger(), - lastBlockNumber: types.BlockNumber(math.MaxUint64), validatorPublicKey: crypto.CompressPubkey(cfg.ValidatorPublicKey), waitForSync: &waitForSync, }, nil } -func (s *Syncer) readLastBlockNumber(ctx context.Context) error { +func (s *Syncer) readLastBlock(ctx context.Context) (*types.Block, common.Hash, error) { rotx, err := s.db.CreateRoTx(ctx) if err != nil { - return err + return nil, common.EmptyHash, err } defer rotx.Rollback() - lastBlock, lastBlockHash, err := db.ReadLastBlock(rotx, s.config.ShardId) + + block, hash, err := db.ReadLastBlock(rotx, s.config.ShardId) if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - return err + return nil, common.EmptyHash, err } if err == nil { - s.lastBlockNumber = lastBlock.Id - s.lastBlockHash = lastBlockHash + return block, hash, err } - return nil + return nil, common.EmptyHash, nil } func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) { - rotx, err := s.db.CreateRoTx(ctx) + block, _, err := s.readLastBlock(ctx) if err != nil { return false, err } - defer rotx.Rollback() - - _, err = db.ReadLastBlockHash(rotx, s.config.ShardId) - if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - return false, err - } - return err != nil, nil + return block == nil, nil } func (s *Syncer) WaitComplete() { @@ -135,15 +124,15 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { return nil } - err := s.readLastBlockNumber(ctx) + block, hash, err := s.readLastBlock(ctx) if err != nil { return fmt.Errorf("failed to read last block number: %w", err) } s.logger.Debug(). - Stringer(logging.FieldBlockHash, s.lastBlockHash). - Uint64(logging.FieldBlockNumber, s.lastBlockNumber.Uint64()). - Msgf("Initialized sync collator at starting block") + Stringer(logging.FieldBlockHash, hash). + Uint64(logging.FieldBlockNumber, uint64(block.Id)). + Msg("Initialized sync collator at starting block") s.logger.Info().Msg("Starting sync") @@ -198,17 +187,22 @@ func (s *Syncer) processTopicTransaction(ctx context.Context, data []byte) (bool Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)). Msg("Received block") - if block.Id != s.lastBlockNumber+1 { + lastBlock, lastHash, err := s.readLastBlock(ctx) + if err != nil { + return false, err + } + + if block.Id != lastBlock.Id+1 { s.logger.Debug(). Stringer(logging.FieldBlockNumber, block.Id). - Msgf("Received block is out of order with the last block %d", s.lastBlockNumber) + Msgf("Received block is out of order with the last block %d", lastBlock.Id) // todo: queue the block for later processing return false, nil } - if block.PrevBlock != s.lastBlockHash { - txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", s.lastBlockHash, block.PrevBlock) + if block.PrevBlock != lastHash { + txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", lastHash, block.PrevBlock) s.logger.Error(). Stringer(logging.FieldBlockNumber, block.Id). Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)). @@ -250,11 +244,16 @@ func (s *Syncer) fetchBlocksRange(ctx context.Context) []*types.BlockWithExtract s.logger.Debug().Msgf("Found %d peers to fetch block from:\n%v", len(peers), peers) + lastBlock, _, err := s.readLastBlock(ctx) + if err != nil { + return nil + } + for _, p := range peers { - s.logger.Debug().Msgf("Requesting block %d from peer %s", s.lastBlockNumber+1, p) + s.logger.Debug().Msgf("Requesting block %d from peer %s", lastBlock.Id+1, p) const count = 100 - blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, s.lastBlockNumber+1, count) + blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, lastBlock.Id+1, count) if err == nil { return blocks } @@ -304,11 +303,9 @@ func (s *Syncer) saveBlocks(ctx context.Context, blocks []*types.BlockWithExtrac } } - s.lastBlockNumber = blocks[len(blocks)-1].Block.Id - s.lastBlockHash = blocks[len(blocks)-1].Block.Hash(s.config.ShardId) - + lastBlockNumber := blocks[len(blocks)-1].Block.Id s.logger.Debug(). - Stringer(logging.FieldBlockNumber, s.lastBlockNumber). + Stringer(logging.FieldBlockNumber, lastBlockNumber). Msg("Blocks written") return nil