Skip to content

Commit

Permalink
Merge pull request #67 from NilFoundation/remove-from-pool
Browse files Browse the repository at this point in the history
Remove failed txns from proposal
  • Loading branch information
dmtrskv authored Feb 1, 2025
2 parents 3655e36 + 2384ec8 commit f0f1ef0
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 13 deletions.
28 changes: 25 additions & 3 deletions nil/internal/collate/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/NilFoundation/nil/nil/internal/db"
"github.com/NilFoundation/nil/nil/internal/execution"
"github.com/NilFoundation/nil/nil/internal/types"
"github.com/NilFoundation/nil/nil/services/txnpool"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -181,6 +182,7 @@ func (p *proposer) handleTransactionsFromPool() error {

sa := execution.NewStateAccessor()

var duplicates, unverified []*types.Transaction
handle := func(txn *types.Transaction) (bool, error) {
hash := txn.Hash()

Expand All @@ -190,15 +192,19 @@ func (p *proposer) handleTransactionsFromPool() error {
} else if err == nil && txnData.Transaction() != nil {
p.logger.Trace().Stringer(logging.FieldTransactionHash, hash).
Msg("Transaction is already in the blockchain. Dropping...")

duplicates = append(duplicates, txn)
return false, nil
}

if res := execution.ValidateExternalTransaction(p.executionState, txn); res.FatalError != nil {
return false, res.FatalError
} else if res.Failed() {
p.logger.Error().Stringer(logging.FieldTransactionHash, hash).
Err(res.Error).Msg("External message validation failed")
p.logger.Warn().Stringer(logging.FieldTransactionHash, hash).
Err(res.Error).Msg("External txn validation failed. Saved failure receipt. Dropping...")

execution.AddFailureReceipt(hash, txn.To, res)
unverified = append(unverified, txn)
return false, nil
}

Expand All @@ -224,8 +230,24 @@ func (p *proposer) handleTransactionsFromPool() error {

p.proposal.InTxns = append(p.proposal.InTxns, txn)
}
}

p.proposal.RemoveFromPool = append(p.proposal.RemoveFromPool, txn)
if len(duplicates) > 0 {
p.logger.Debug().Msgf("Removing %d duplicate transactions from the pool", len(duplicates))

if err := p.pool.Discard(p.ctx, duplicates, txnpool.DuplicateHash); err != nil {
p.logger.Error().Err(err).
Msgf("Failed to remove %d duplicate transactions from the pool", len(duplicates))
}
}

if len(unverified) > 0 {
p.logger.Debug().Msgf("Removing %d unverifiable transactions from the pool", len(unverified))

if err := p.pool.Discard(p.ctx, unverified, txnpool.Unverified); err != nil {
p.logger.Error().Err(err).
Msgf("Failed to remove %d unverifiable transactions from the pool", len(unverified))
}
}

return nil
Expand Down
7 changes: 3 additions & 4 deletions nil/internal/collate/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/NilFoundation/nil/nil/internal/db"
"github.com/NilFoundation/nil/nil/internal/execution"
"github.com/NilFoundation/nil/nil/internal/types"
"github.com/NilFoundation/nil/nil/services/txnpool"
"github.com/rs/zerolog"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -123,7 +124,6 @@ func (s *ProposerTestSuite) TestCollator() {
r1 := s.checkReceipt(shardId, m1)
r2 := s.checkReceipt(shardId, m2)
s.Equal(pool.Txns, proposal.InTxns)
s.Equal(pool.Txns, proposal.RemoveFromPool)

pool.Txns = nil

Expand Down Expand Up @@ -170,9 +170,8 @@ func (s *ProposerTestSuite) TestCollator() {
proposal := generateBlock()
s.Empty(proposal.InTxns)
s.Empty(proposal.ForwardTxns)
s.Equal(pool.Txns, proposal.RemoveFromPool)

pool.Txns = nil
s.Equal(pool.Txns, pool.LastDiscarded)
s.Equal(txnpool.DuplicateHash, pool.LastReason)
})

s.Run("Deploy", func() {
Expand Down
1 change: 1 addition & 0 deletions nil/internal/collate/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type TxnPool interface {
Peek(ctx context.Context, n int) ([]*types.Transaction, error)
Discard(ctx context.Context, txns []*types.Transaction, reason txnpool.DiscardReason) error
OnCommitted(ctx context.Context, committed []*types.Transaction) error
}

Expand Down
16 changes: 16 additions & 0 deletions nil/internal/collate/testaide.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,37 @@ import (
"context"

"github.com/NilFoundation/nil/nil/internal/types"
"github.com/NilFoundation/nil/nil/services/txnpool"
)

type MockTxnPool struct {
Txns []*types.Transaction

LastDiscarded []*types.Transaction
LastReason txnpool.DiscardReason
}

var _ TxnPool = (*MockTxnPool)(nil)

func (m *MockTxnPool) Reset() {
m.Txns = nil
m.LastDiscarded = nil
m.LastReason = 0
}

func (m *MockTxnPool) Peek(_ context.Context, n int) ([]*types.Transaction, error) {
if n > len(m.Txns) {
return m.Txns, nil
}
return m.Txns[:n], nil
}

func (m *MockTxnPool) Discard(_ context.Context, txns []*types.Transaction, reason txnpool.DiscardReason) error {
m.LastDiscarded = txns
m.LastReason = reason
return nil
}

func (m *MockTxnPool) OnCommitted(context.Context, []*types.Transaction) error {
return nil
}
5 changes: 3 additions & 2 deletions nil/internal/collate/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func (s *Validator) InsertProposal(ctx context.Context, proposal *execution.Prop
return fmt.Errorf("failed to generate block: %w", err)
}

if err := s.pool.OnCommitted(ctx, proposal.RemoveFromPool); err != nil {
s.logger.Warn().Err(err).Msgf("Failed to remove %d committed transactions from pool", len(proposal.RemoveFromPool))
if err := s.pool.OnCommitted(ctx, proposal.InTxns); err != nil {
s.logger.Warn().Err(err).
Msgf("Failed to remove %d committed transactions from pool", len(proposal.InTxns))
}

return PublishBlock(ctx, s.networkManager, s.params.ShardId, &types.BlockWithExtractedData{
Expand Down
4 changes: 0 additions & 4 deletions nil/internal/execution/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ type Proposal struct {

InTxns []*types.Transaction `json:"inTxns" ssz-max:"4096"`
ForwardTxns []*types.Transaction `json:"forwardTxns" ssz-max:"4096"`

// In the future, collator should remove transactions from the pool itself after the consensus on the proposal.
// Currently, we need to remove them after the block was committed, or they may be lost.
RemoveFromPool []*types.Transaction `json:"removeFromPool" ssz-max:"4096"`
}

func NewEmptyProposal() *Proposal {
Expand Down
18 changes: 18 additions & 0 deletions nil/services/txnpool/txnpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type Pool interface {
Add(ctx context.Context, txns ...*types.Transaction) ([]DiscardReason, error)
Discard(ctx context.Context, txns []*types.Transaction, reason DiscardReason) error
OnCommitted(ctx context.Context, committed []*types.Transaction) error
// IdHashKnown check whether transaction with given Id hash is known to the pool
IdHashKnown(hash common.Hash) (bool, error)
Expand Down Expand Up @@ -287,6 +288,23 @@ func (p *TxnPool) discardLocked(mm *metaTxn, reason DiscardReason) {
p.all.delete(mm, reason)
}

func (p *TxnPool) Discard(_ context.Context, txns []*types.Transaction, reason DiscardReason) error {
p.lock.Lock()
defer p.lock.Unlock()

for _, txn := range txns {
mm := p.getLocked(txn.Hash())
if mm == nil {
continue
}

p.queue.Remove(mm)
p.discardLocked(mm, reason)
}

return nil
}

func (p *TxnPool) OnCommitted(_ context.Context, committed []*types.Transaction) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions nil/services/txnpool/txnpoolcfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
SeqnoTooLow DiscardReason = 18
NotReplaced DiscardReason = 20 // There was an existing transaction with the same sender and seqno, not enough price bump to replace
DuplicateHash DiscardReason = 21 // There was an existing transaction with the same hash
Unverified DiscardReason = 22 // Transaction verification failed
)

func (r DiscardReason) String() string {
Expand All @@ -60,6 +61,8 @@ func (r DiscardReason) String() string {
return "seqno too low"
case DuplicateHash:
return "duplicate hash"
case Unverified:
return "verification failed"
default:
panic(fmt.Sprintf("discard reason: %d", r))
}
Expand Down

0 comments on commit f0f1ef0

Please sign in to comment.