diff --git a/nil/internal/collate/proposer.go b/nil/internal/collate/proposer.go index 5d0591b33..f0a3fa001 100644 --- a/nil/internal/collate/proposer.go +++ b/nil/internal/collate/proposer.go @@ -13,6 +13,7 @@ import ( "github.com/NilFoundation/nil/nil/internal/db" "github.com/NilFoundation/nil/nil/internal/execution" "github.com/NilFoundation/nil/nil/internal/types" + "github.com/NilFoundation/nil/nil/services/txnpool" "github.com/rs/zerolog" ) @@ -181,6 +182,7 @@ func (p *proposer) handleTransactionsFromPool() error { sa := execution.NewStateAccessor() + var duplicates, unverified []*types.Transaction handle := func(txn *types.Transaction) (bool, error) { hash := txn.Hash() @@ -190,15 +192,19 @@ func (p *proposer) handleTransactionsFromPool() error { } else if err == nil && txnData.Transaction() != nil { p.logger.Trace().Stringer(logging.FieldTransactionHash, hash). Msg("Transaction is already in the blockchain. Dropping...") + + duplicates = append(duplicates, txn) return false, nil } if res := execution.ValidateExternalTransaction(p.executionState, txn); res.FatalError != nil { return false, res.FatalError } else if res.Failed() { - p.logger.Error().Stringer(logging.FieldTransactionHash, hash). - Err(res.Error).Msg("External message validation failed") + p.logger.Warn().Stringer(logging.FieldTransactionHash, hash). + Err(res.Error).Msg("External txn validation failed. Saved failure receipt. Dropping...") + execution.AddFailureReceipt(hash, txn.To, res) + unverified = append(unverified, txn) return false, nil } @@ -224,8 +230,24 @@ func (p *proposer) handleTransactionsFromPool() error { p.proposal.InTxns = append(p.proposal.InTxns, txn) } + } - p.proposal.RemoveFromPool = append(p.proposal.RemoveFromPool, txn) + if len(duplicates) > 0 { + p.logger.Debug().Msgf("Removing %d duplicate transactions from the pool", len(duplicates)) + + if err := p.pool.Discard(p.ctx, duplicates, txnpool.DuplicateHash); err != nil { + p.logger.Error().Err(err). + Msgf("Failed to remove %d duplicate transactions from the pool", len(duplicates)) + } + } + + if len(unverified) > 0 { + p.logger.Debug().Msgf("Removing %d unverifiable transactions from the pool", len(unverified)) + + if err := p.pool.Discard(p.ctx, unverified, txnpool.Unverified); err != nil { + p.logger.Error().Err(err). + Msgf("Failed to remove %d unverifiable transactions from the pool", len(unverified)) + } } return nil diff --git a/nil/internal/collate/proposer_test.go b/nil/internal/collate/proposer_test.go index 65dd68041..eaed74202 100644 --- a/nil/internal/collate/proposer_test.go +++ b/nil/internal/collate/proposer_test.go @@ -10,6 +10,7 @@ import ( "github.com/NilFoundation/nil/nil/internal/db" "github.com/NilFoundation/nil/nil/internal/execution" "github.com/NilFoundation/nil/nil/internal/types" + "github.com/NilFoundation/nil/nil/services/txnpool" "github.com/rs/zerolog" "github.com/stretchr/testify/suite" ) @@ -123,7 +124,6 @@ func (s *ProposerTestSuite) TestCollator() { r1 := s.checkReceipt(shardId, m1) r2 := s.checkReceipt(shardId, m2) s.Equal(pool.Txns, proposal.InTxns) - s.Equal(pool.Txns, proposal.RemoveFromPool) pool.Txns = nil @@ -170,9 +170,8 @@ func (s *ProposerTestSuite) TestCollator() { proposal := generateBlock() s.Empty(proposal.InTxns) s.Empty(proposal.ForwardTxns) - s.Equal(pool.Txns, proposal.RemoveFromPool) - - pool.Txns = nil + s.Equal(pool.Txns, pool.LastDiscarded) + s.Equal(txnpool.DuplicateHash, pool.LastReason) }) s.Run("Deploy", func() { diff --git a/nil/internal/collate/scheduler.go b/nil/internal/collate/scheduler.go index 26227d4de..f14bdf88d 100644 --- a/nil/internal/collate/scheduler.go +++ b/nil/internal/collate/scheduler.go @@ -17,6 +17,7 @@ import ( type TxnPool interface { Peek(ctx context.Context, n int) ([]*types.Transaction, error) + Discard(ctx context.Context, txns []*types.Transaction, reason txnpool.DiscardReason) error OnCommitted(ctx context.Context, committed []*types.Transaction) error } diff --git a/nil/internal/collate/testaide.go b/nil/internal/collate/testaide.go index a138c50a0..f66220259 100644 --- a/nil/internal/collate/testaide.go +++ b/nil/internal/collate/testaide.go @@ -6,14 +6,24 @@ import ( "context" "github.com/NilFoundation/nil/nil/internal/types" + "github.com/NilFoundation/nil/nil/services/txnpool" ) type MockTxnPool struct { Txns []*types.Transaction + + LastDiscarded []*types.Transaction + LastReason txnpool.DiscardReason } var _ TxnPool = (*MockTxnPool)(nil) +func (m *MockTxnPool) Reset() { + m.Txns = nil + m.LastDiscarded = nil + m.LastReason = 0 +} + func (m *MockTxnPool) Peek(_ context.Context, n int) ([]*types.Transaction, error) { if n > len(m.Txns) { return m.Txns, nil @@ -21,6 +31,12 @@ func (m *MockTxnPool) Peek(_ context.Context, n int) ([]*types.Transaction, erro return m.Txns[:n], nil } +func (m *MockTxnPool) Discard(_ context.Context, txns []*types.Transaction, reason txnpool.DiscardReason) error { + m.LastDiscarded = txns + m.LastReason = reason + return nil +} + func (m *MockTxnPool) OnCommitted(context.Context, []*types.Transaction) error { return nil } diff --git a/nil/internal/collate/validator.go b/nil/internal/collate/validator.go index 6cd046a33..f03a569cd 100644 --- a/nil/internal/collate/validator.go +++ b/nil/internal/collate/validator.go @@ -56,8 +56,9 @@ func (s *Validator) InsertProposal(ctx context.Context, proposal *execution.Prop return fmt.Errorf("failed to generate block: %w", err) } - if err := s.pool.OnCommitted(ctx, proposal.RemoveFromPool); err != nil { - s.logger.Warn().Err(err).Msgf("Failed to remove %d committed transactions from pool", len(proposal.RemoveFromPool)) + if err := s.pool.OnCommitted(ctx, proposal.InTxns); err != nil { + s.logger.Warn().Err(err). + Msgf("Failed to remove %d committed transactions from pool", len(proposal.InTxns)) } return PublishBlock(ctx, s.networkManager, s.params.ShardId, &types.BlockWithExtractedData{ diff --git a/nil/internal/execution/block_generator.go b/nil/internal/execution/block_generator.go index 34a12deb5..7fef1e14a 100644 --- a/nil/internal/execution/block_generator.go +++ b/nil/internal/execution/block_generator.go @@ -33,10 +33,6 @@ type Proposal struct { InTxns []*types.Transaction `json:"inTxns" ssz-max:"4096"` ForwardTxns []*types.Transaction `json:"forwardTxns" ssz-max:"4096"` - - // In the future, collator should remove transactions from the pool itself after the consensus on the proposal. - // Currently, we need to remove them after the block was committed, or they may be lost. - RemoveFromPool []*types.Transaction `json:"removeFromPool" ssz-max:"4096"` } func NewEmptyProposal() *Proposal { diff --git a/nil/services/txnpool/txnpool.go b/nil/services/txnpool/txnpool.go index 4f612449c..0f4ab7c96 100644 --- a/nil/services/txnpool/txnpool.go +++ b/nil/services/txnpool/txnpool.go @@ -16,6 +16,7 @@ import ( type Pool interface { Add(ctx context.Context, txns ...*types.Transaction) ([]DiscardReason, error) + Discard(ctx context.Context, txns []*types.Transaction, reason DiscardReason) error OnCommitted(ctx context.Context, committed []*types.Transaction) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(hash common.Hash) (bool, error) @@ -287,6 +288,23 @@ func (p *TxnPool) discardLocked(mm *metaTxn, reason DiscardReason) { p.all.delete(mm, reason) } +func (p *TxnPool) Discard(_ context.Context, txns []*types.Transaction, reason DiscardReason) error { + p.lock.Lock() + defer p.lock.Unlock() + + for _, txn := range txns { + mm := p.getLocked(txn.Hash()) + if mm == nil { + continue + } + + p.queue.Remove(mm) + p.discardLocked(mm, reason) + } + + return nil +} + func (p *TxnPool) OnCommitted(_ context.Context, committed []*types.Transaction) (err error) { p.lock.Lock() defer p.lock.Unlock() diff --git a/nil/services/txnpool/txnpoolcfg.go b/nil/services/txnpool/txnpoolcfg.go index 152bb784e..ec0097243 100644 --- a/nil/services/txnpool/txnpoolcfg.go +++ b/nil/services/txnpool/txnpoolcfg.go @@ -34,6 +34,7 @@ const ( SeqnoTooLow DiscardReason = 18 NotReplaced DiscardReason = 20 // There was an existing transaction with the same sender and seqno, not enough price bump to replace DuplicateHash DiscardReason = 21 // There was an existing transaction with the same hash + Unverified DiscardReason = 22 // Transaction verification failed ) func (r DiscardReason) String() string { @@ -60,6 +61,8 @@ func (r DiscardReason) String() string { return "seqno too low" case DuplicateHash: return "duplicate hash" + case Unverified: + return "verification failed" default: panic(fmt.Sprintf("discard reason: %d", r)) }