Skip to content

Commit

Permalink
diagnostics: stream all incoming transaction to diagnostics (#13885)
Browse files Browse the repository at this point in the history
Sending all incoming transactions to diagnostics in order to be streamed
to the UI. Alongside with these transactions also sending hashes of
duplicated transactions `knownTxns` it will show how many duplicates
wandering around. This is the first implementation so in future object
structure will be updated.

Closes #13400
  • Loading branch information
dvovk authored Feb 24, 2025
1 parent a0706cc commit 880c523
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
33 changes: 26 additions & 7 deletions erigon-lib/diagnostics/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,37 @@ package diagnostics
import (
"context"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/holiman/uint256"
)

type DiagTxn struct {
Hash []byte `json:"hash"`
IDHash [32]byte `json:"hash"`
SenderID uint64 `json:"senderID"`
Nonce uint64 `json:"nonce"`
Value uint256.Int `json:"value"`
Gas uint64 `json:"gas"`
FeeCap uint256.Int `json:"feeCap"`
Tip uint256.Int `json:"tip"`
Size uint32 `json:"size"`
Type byte `json:"type"`
Creation bool `json:"creation"`
DataLen int `json:"dataLen"`
AccessListAddrCount int `json:"accessListAddrCount"`
AccessListStorCount int `json:"accessListStorCount"`
BlobHashes []common.Hash `json:"blobHashes"`
Blobs [][]byte `json:"blobs"`
}

type IncommingTxnUpdate struct {
Txn DiagTxn `json:"txns"`
type IncomingTxnUpdate struct {
Txns []DiagTxn `json:"txns"`
Senders []byte `json:"senders"`
IsLocal []bool `json:"isLocal"`
KnownTxns [][]byte `json:"knownTxns"` //hashes of incomming transactions from p2p network which are already in the pool
}

func (ti IncommingTxnUpdate) Type() Type {
func (ti IncomingTxnUpdate) Type() Type {
return TypeOf(ti)
}

Expand All @@ -41,18 +60,18 @@ func (d *DiagnosticClient) setupTxPoolDiagnostics(rootCtx context.Context) {

func (d *DiagnosticClient) runOnIncommingTxnListener(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[IncommingTxnUpdate](rootCtx, 1)
ctx, ch, closeChannel := Context[IncomingTxnUpdate](rootCtx, 1)
defer closeChannel()

StartProviders(ctx, TypeOf(IncommingTxnUpdate{}), log.Root())
StartProviders(ctx, TypeOf(IncomingTxnUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
return
case info := <-ch:
d.Notify(DiagMessages{
MessageType: "txpool",
Message: string(info.Txn.Hash),
Message: info,
})
}
}
Expand Down
5 changes: 2 additions & 3 deletions turbo/app/support_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,10 @@ func (nc *nodeConnection) startListening() {
return
}

fmt.Println("Received message, ", string(message))
nc.responseChannel <- nodeResponse{
Id: nc.requestId,
Result: message,
Last: true,
Last: false,
}
}
}
Expand Down Expand Up @@ -530,7 +529,7 @@ func (nc *nodeConnection) processRequests(metricsClient *http.Client) {
nc.responseChannel <- nodeResponse{
Id: action.requestId,
Result: json.RawMessage(bytes.Bytes()),
Last: true,
Last: false,
}

case isUnsubscribe(action.method):
Expand Down
32 changes: 32 additions & 0 deletions txnprovider/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/diagnostics"
"github.com/erigontech/erigon-lib/gointerfaces/grpcutil"
remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto"
sentry "github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
Expand Down Expand Up @@ -323,6 +324,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
}
case sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_66:
txns := TxnSlots{}
knownTxns := [][]byte{}
if err := f.threadSafeParsePooledTxn(func(parseContext *TxnParseContext) error {
return nil
}); err != nil {
Expand All @@ -338,6 +340,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return err
}
if known {
knownTxns = append(knownTxns, hash)
return ErrRejected
}
return nil
Expand All @@ -356,6 +359,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return err
}
if known {
knownTxns = append(knownTxns, hash)
return ErrRejected
}
return nil
Expand All @@ -372,6 +376,34 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
if len(txns.Txns) == 0 {
return nil
}

diagTxns := make([]diagnostics.DiagTxn, len(txns.Txns))
for i, txn := range txns.Txns {
diagTxns[i] = diagnostics.DiagTxn{
IDHash: txn.IDHash,
SenderID: txn.SenderID,
Nonce: txn.Nonce,
Value: txn.Value,
Gas: txn.Gas,
FeeCap: txn.FeeCap,
Tip: txn.Tip,
Size: txn.Size,
Type: txn.Type,
Creation: txn.Creation,
DataLen: txn.DataLen,
AccessListAddrCount: txn.AccessListAddrCount,
AccessListStorCount: txn.AccessListStorCount,
BlobHashes: txn.BlobHashes,
Blobs: txn.Blobs,
}
}

diagnostics.Send(diagnostics.IncomingTxnUpdate{
Txns: diagTxns,
Senders: txns.Senders,
IsLocal: txns.IsLocal,
KnownTxns: knownTxns,
})
f.pool.AddRemoteTxns(ctx, txns)
default:
defer f.logger.Trace("[txpool] dropped p2p message", "id", req.Id)
Expand Down

0 comments on commit 880c523

Please sign in to comment.