From 5f326da553794982857db438c322158383f301a1 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Wed, 18 Oct 2023 12:09:49 -0300 Subject: [PATCH] dcrdtest: Automatic TearDown on SetUp failures This adds an automatic TearDown() call for an rpc harness that fails to be setup, so that callers do not need to remember to do it themselves. This improves usability of the package by ensuring goroutines do not leak after either a failed SetUp() or a TearDown(). Unit tests are added that assert the correct behavior on both a successfull setup and when certain specific setup calls fail. This also fixes a goroutine leak in the internal memory wallet. --- dcrdtest/go.mod | 3 +- dcrdtest/go.sum | 2 + dcrdtest/memwallet.go | 29 ++++++- dcrdtest/node.go | 45 ++++++++-- dcrdtest/rpc_harness.go | 38 +++++++-- dcrdtest/rpc_harness_test.go | 156 ++++++++++++++++++++++++++++++++--- dcrtest.work.sum | 2 + 7 files changed, 246 insertions(+), 29 deletions(-) create mode 100644 dcrtest.work.sum diff --git a/dcrdtest/go.mod b/dcrdtest/go.mod index 1e23aaa..86f1d28 100644 --- a/dcrdtest/go.mod +++ b/dcrdtest/go.mod @@ -21,6 +21,8 @@ require ( github.com/decred/dcrd/rpcclient/v8 v8.0.0 github.com/decred/dcrd/txscript/v4 v4.1.0 github.com/decred/dcrd/wire v1.6.0 + github.com/decred/slog v1.2.0 + matheusd.com/testctx v0.1.0 ) require ( @@ -42,7 +44,6 @@ require ( github.com/decred/dcrd/math/uint256 v1.0.1 // indirect github.com/decred/dcrd/peer/v3 v3.0.2 // indirect github.com/decred/go-socks v1.1.0 // indirect - github.com/decred/slog v1.2.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/jessevdk/go-flags v1.5.0 // indirect diff --git a/dcrdtest/go.sum b/dcrdtest/go.sum index b1ba2c1..0e39638 100644 --- a/dcrdtest/go.sum +++ b/dcrdtest/go.sum @@ -145,3 +145,5 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= +matheusd.com/testctx v0.1.0 h1:MBpaNuqr23ugnkA59gz8Bd6BQIGkvZr7M4vYAc/Apzc= +matheusd.com/testctx v0.1.0/go.mod h1:u9la0YA1XIBcEpTU/aHJ9q4/L0VttkwhkG2m4lrj7Ls= diff --git a/dcrdtest/memwallet.go b/dcrdtest/memwallet.go index 299019f..1b854f1 100644 --- a/dcrdtest/memwallet.go +++ b/dcrdtest/memwallet.go @@ -114,6 +114,12 @@ type memWallet struct { net *chaincfg.Params + // quit is closed when the harness node is stopped. + quit chan struct{} + + // wg tracks the mem wallet's goroutines. + wg sync.WaitGroup + rpc *rpcclient.Client sync.RWMutex @@ -164,14 +170,23 @@ func newMemWallet(net *chaincfg.Params, harnessID uint32) (*memWallet, error) { utxos: make(map[wire.OutPoint]*utxo), chainUpdateSignal: make(chan struct{}), reorgJournal: make(map[int64]*undoEntry), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, }, nil } // Start launches all goroutines required for the wallet to function properly. func (m *memWallet) Start() { + m.wg.Add(1) go m.chainSyncer() } +// Stop stops all goroutines required for the wallet to function properly. +func (m *memWallet) Stop() { + close(m.quit) + m.wg.Wait() +} + // SyncedHeight returns the height the wallet is known to be synced to. // // This function is safe for concurrent access. @@ -219,7 +234,10 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) { // available. We do this in a new goroutine in order to avoid blocking // the main loop of the rpc client. go func() { - m.chainUpdateSignal <- struct{}{} + select { + case m.chainUpdateSignal <- struct{}{}: + case <-m.quit: + } }() } @@ -230,10 +248,17 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) { func (m *memWallet) chainSyncer() { log.Tracef("memwallet.chainSyncer") defer log.Tracef("memwallet.chainSyncer exit") + defer m.wg.Done() var update *chainUpdate - for range m.chainUpdateSignal { + for { + select { + case <-m.chainUpdateSignal: + case <-m.quit: + return + } + // A new update is available, so pop the new chain update from // the front of the update queue. m.chainMtx.Lock() diff --git a/dcrdtest/node.go b/dcrdtest/node.go index 325a79d..cb1771b 100644 --- a/dcrdtest/node.go +++ b/dcrdtest/node.go @@ -24,6 +24,9 @@ import ( rpc "github.com/decred/dcrd/rpcclient/v8" ) +// errDcrdCmdExec is the error returned when the dcrd binary is not executed. +var errDcrdCmdExec = errors.New("unable to exec dcrd binary") + // nodeConfig contains all the args, and data required to launch a dcrd process // and connect the rpc client to it. type nodeConfig struct { @@ -201,7 +204,6 @@ func newNode(config *nodeConfig, dataDir string) (*node, error) { return &node{ config: config, dataDir: dataDir, - cmd: config.command(), }, nil } @@ -216,8 +218,10 @@ func (n *node) start(ctx context.Context) error { var pid sync.WaitGroup pid.Add(1) + cmd := n.config.command() + // Redirect stderr. - n.stderr, err = n.cmd.StderrPipe() + n.stderr, err = cmd.StderrPipe() if err != nil { return err } @@ -229,7 +233,10 @@ func (n *node) start(ctx context.Context) error { for { line, err := r.ReadBytes('\n') if errors.Is(err, io.EOF) { - log.Tracef("stderr: EOF") + n.logf("stderr: EOF") + return + } else if err != nil { + n.logf("stderr: Unable to read stderr: %v", err) return } n.logf("stderr: %s", line) @@ -237,7 +244,7 @@ func (n *node) start(ctx context.Context) error { }() // Redirect stdout. - n.stdout, err = n.cmd.StdoutPipe() + n.stdout, err = cmd.StdoutPipe() if err != nil { return err } @@ -249,7 +256,10 @@ func (n *node) start(ctx context.Context) error { for { line, err := r.ReadBytes('\n') if errors.Is(err, io.EOF) { - log.Tracef("stdout: EOF") + n.logf("stdout: EOF") + return + } else if err != nil { + n.logf("stdout: Unable to read stdout: %v", err) return } log.Tracef("stdout: %s", line) @@ -269,8 +279,10 @@ func (n *node) start(ctx context.Context) error { switch msg := msg.(type) { case boundP2PListenAddrEvent: p2pAddr = string(msg) + n.logf("P2P listen addr: %s", p2pAddr) case boundRPCListenAddrEvent: rpcAddr = string(msg) + n.logf("RPC listen addr: %s", rpcAddr) } if p2pAddr != "" && rpcAddr != "" { close(gotSubsysAddrs) @@ -283,33 +295,45 @@ func (n *node) start(ctx context.Context) error { for err == nil { _, err = nextIPCMessage(n.config.pipeRX.r) } + n.logf("IPC messages drained") }() // Launch command and store pid. - if err := n.cmd.Start(); err != nil { - return err + if err := cmd.Start(); err != nil { + // When failing to execute, wait until running goroutines are + // closed. + pid.Done() + n.wg.Wait() + n.config.pipeTX.close() + n.config.pipeRX.close() + return fmt.Errorf("%w: %v", errDcrdCmdExec, err) } + n.cmd = cmd n.pid = n.cmd.Process.Pid - // Unblock pipes now pid is available + // Unblock pipes now that pid is available. pid.Done() f, err := os.Create(filepath.Join(n.config.String(), "dcrd.pid")) if err != nil { + _ = n.stop() // Cleanup what has been done so far. return err } n.pidFile = f.Name() if _, err = fmt.Fprintf(f, "%d\n", n.cmd.Process.Pid); err != nil { + _ = n.stop() // Cleanup what has been done so far. return err } if err := f.Close(); err != nil { + _ = n.stop() // Cleanup what has been done so far. return err } // Read the RPC and P2P addresses. select { case <-ctx.Done(): + _ = n.stop() // Cleanup what has been done so far. return ctx.Err() case <-gotSubsysAddrs: n.p2pAddr = p2pAddr @@ -323,7 +347,7 @@ func (n *node) start(ctx context.Context) error { // properly. On windows, interrupt is not supported, so a kill signal is used // instead func (n *node) stop() error { - log.Tracef("stop %p %p", n.cmd, n.cmd.Process) + log.Tracef("stop %p", n.cmd) defer log.Tracef("stop done") if n.cmd == nil || n.cmd.Process == nil { @@ -366,6 +390,9 @@ func (n *node) stop() error { if err := n.config.pipeTX.close(); err != nil { n.logf("Unable to close pipe TX: %v", err) } + + // Mark command terminated. + n.cmd = nil return nil } diff --git a/dcrdtest/rpc_harness.go b/dcrdtest/rpc_harness.go index 3f0fd9e..36f0401 100644 --- a/dcrdtest/rpc_harness.go +++ b/dcrdtest/rpc_harness.go @@ -7,6 +7,7 @@ package dcrdtest import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -38,6 +39,8 @@ var ( // throughout the life of this package. pathToDCRD string pathToDCRDMtx sync.RWMutex + + errNilCoinbaseAddr = errors.New("memWallet coinbase addr is nil") ) // Harness fully encapsulates an active dcrd process to provide a unified @@ -218,7 +221,14 @@ func New(t *testing.T, activeNet *chaincfg.Params, handlers *rpcclient.Notificat // // NOTE: This method and TearDown should always be called from the same // goroutine as they are not concurrent safe. -func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) error { +func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) (err error) { + defer func() { + if err != nil { + tearErr := h.TearDown() + log.Warnf("Teardown error after setup error %v: %v", err, tearErr) + } + }() + // Start the dcrd node itself. This spawns a new process which will be // managed if err := h.node.start(ctx); err != nil { @@ -231,6 +241,9 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp // Filter transactions that pay to the coinbase associated with the // wallet. + if h.wallet.coinbaseAddr == nil { + return errNilCoinbaseAddr + } filterAddrs := []stdaddr.Address{h.wallet.coinbaseAddr} if err := h.Node.LoadTxFilter(ctx, true, filterAddrs, nil); err != nil { return err @@ -282,17 +295,28 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp // NOTE: This method and SetUp should always be called from the same goroutine // as they are not concurrent safe. func (h *Harness) TearDown() error { - log.Tracef("TearDown %p %p", h.Node, h.node) - defer log.Tracef("TearDown done") + log.Debugf("TearDown %p %p", h.Node, h.node) + defer log.Debugf("TearDown done") if h.Node != nil { - log.Tracef("TearDown: Node") + log.Debugf("TearDown: Node") h.Node.Shutdown() + h.Node = nil } - log.Tracef("TearDown: node") - if err := h.node.shutdown(); err != nil { - return err + if h.node != nil { + log.Debugf("TearDown: node") + node := h.node + h.node = nil + if err := node.shutdown(); err != nil { + return err + } + } + + log.Debugf("TearDown: wallet") + if h.wallet != nil { + h.wallet.Stop() + h.wallet = nil } return nil diff --git a/dcrdtest/rpc_harness_test.go b/dcrdtest/rpc_harness_test.go index b10f541..b29b5cc 100644 --- a/dcrdtest/rpc_harness_test.go +++ b/dcrdtest/rpc_harness_test.go @@ -6,8 +6,12 @@ package dcrdtest import ( + "bytes" "context" + "errors" "fmt" + "os" + "runtime/pprof" "testing" "time" @@ -16,10 +20,8 @@ import ( "github.com/decred/dcrd/dcrutil/v4" dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/wire" -) - -const ( - numMatureOutputs = 25 + "github.com/decred/slog" + "matheusd.com/testctx" ) func testSendOutputs(ctx context.Context, r *Harness, t *testing.T) { @@ -565,6 +567,8 @@ func testMemWalletLockedOutputs(ctx context.Context, r *Harness, t *testing.T) { } func TestHarness(t *testing.T) { + const numMatureOutputs = 25 + var err error mainHarness, err := New(t, chaincfg.RegNetParams(), nil, nil) if err != nil { @@ -575,12 +579,6 @@ func TestHarness(t *testing.T) { // 25 mature coinbases to allow spending from for testing purposes. ctx := context.Background() if err = mainHarness.SetUp(ctx, true, numMatureOutputs); err != nil { - // Even though the harness was not fully setup, it still needs - // to be torn down to ensure all resources such as temp - // directories are cleaned up. The error is intentionally - // ignored since this is already an error path and nothing else - // could be done about it anyways. - _ = mainHarness.TearDown() t.Fatalf("unable to setup test chain: %v", err) } @@ -658,3 +656,141 @@ func TestHarness(t *testing.T) { } } } + +// loggerWriter is an slog backend that writes to a test output. +type loggerWriter struct { + l testing.TB +} + +func (lw loggerWriter) Write(b []byte) (int, error) { + bt := bytes.TrimRight(b, "\r\n") + lw.l.Logf(string(bt)) + return len(b), nil +} + +// TestSetupTeardown tests that setting up and tearing down an rpc harness does +// not leak any goroutines. +func TestSetupTeardown(t *testing.T) { + // Add logging to ease debugging this test. + lw := loggerWriter{l: t} + bknd := slog.NewBackend(lw) + UseLogger(bknd.Logger("TEST")) + log.SetLevel(slog.LevelDebug) + defer UseLogger(slog.Disabled) + + params := chaincfg.RegNetParams() + mainHarness, err := New(t, params, nil, nil) + if err != nil { + t.Fatalf("unable to create main harness: %v", err) + } + + // Perform the setup. + ctx := testctx.WithTimeout(t, time.Second*30) + if err = mainHarness.SetUp(ctx, true, 2); err != nil { + t.Fatalf("unable to setup test chain: %v", err) + } + + // Perform the teardown. + if err := mainHarness.TearDown(); err != nil { + t.Fatalf("unable to TearDown test chain: %v", err) + } + + // There should be only 2 goroutines live. + prof := pprof.Lookup("goroutine") + gotCount, wantCount := prof.Count(), 2 + if gotCount != wantCount { + prof.WriteTo(os.Stderr, 1) + t.Fatalf("Unexpected nb of active goroutines: got %d, want %d", + gotCount, wantCount) + } +} + +// TestSetupWithError tests that when the setup of an rpc harness fails, it +// cleanly tears down the harness without user intervention. +func TestSetupWithError(t *testing.T) { + // Keep track of how many goroutines are running before the test + // happens. + beforeCount := pprof.Lookup("goroutine").Count() + + // Add logging to ease debugging this test. + lw := loggerWriter{l: t} + bknd := slog.NewBackend(lw) + UseLogger(bknd.Logger("TEST")) + log.SetLevel(slog.LevelDebug) + defer UseLogger(slog.Disabled) + + params := chaincfg.RegNetParams() + mainHarness, err := New(t, params, nil, nil) + if err != nil { + t.Fatalf("unable to create main harness: %v", err) + } + + // Reach into the wallet and set a nil coinbaseAddr. This will cause + // SetUp() to fail with a well known error. + mainHarness.wallet.coinbaseAddr = nil + + // Perform the setup. This should fail. + ctx := testctx.WithTimeout(t, time.Second*30) + if err := mainHarness.SetUp(ctx, true, 2); !errors.Is(err, errNilCoinbaseAddr) { + t.Fatalf("Unexpected error in Setup(): got %v, want %v", err, errNilCoinbaseAddr) + } + + // There should not be any new goroutines running. + prof := pprof.Lookup("goroutine") + afterCount := prof.Count() + if afterCount != beforeCount { + prof.WriteTo(os.Stderr, 1) + t.Fatalf("Unexpected nb of active goroutines: got %d, want %d", + afterCount, beforeCount) + } + + // Calling TearDown should not panic or error. + if err := mainHarness.TearDown(); err != nil { + t.Fatalf("Unexpected error during TearDown: %v", err) + } +} + +// TestSetupWithWrongDcrd tests that when the setup of an rpc harness fails due +// to an inexistent dcrd binary, it cleanly tears down the harness without user +// intervention. +func TestSetupWithWrongDcrd(t *testing.T) { + // Keep track of how many goroutines are running before the test + // happens. + beforeCount := pprof.Lookup("goroutine").Count() + + // Add logging to ease debugging this test. + lw := loggerWriter{l: t} + bknd := slog.NewBackend(lw) + UseLogger(bknd.Logger("TEST")) + log.SetLevel(slog.LevelDebug) + defer UseLogger(slog.Disabled) + + SetPathToDCRD("/path/to/dcrd/that/does/not/exist") + defer SetPathToDCRD("") + + params := chaincfg.RegNetParams() + mainHarness, err := New(t, params, nil, nil) + if err != nil { + t.Fatalf("unable to create main harness: %v", err) + } + + // Perform the setup. This should fail. + ctx := testctx.WithTimeout(t, time.Second*30) + if err := mainHarness.SetUp(ctx, true, 2); !errors.Is(err, errDcrdCmdExec) { + t.Fatalf("Unexpected error in Setup(): got %v, want %v", err, errDcrdCmdExec) + } + + // There should not be any new goroutines running. + prof := pprof.Lookup("goroutine") + afterCount := prof.Count() + if afterCount != beforeCount { + prof.WriteTo(os.Stderr, 1) + t.Fatalf("Unexpected nb of active goroutines: got %d, want %d", + afterCount, beforeCount) + } + + // Calling TearDown should not panic or error. + if err := mainHarness.TearDown(); err != nil { + t.Fatalf("Unexpected error during TearDown: %v", err) + } +} diff --git a/dcrtest.work.sum b/dcrtest.work.sum new file mode 100644 index 0000000..1dfd445 --- /dev/null +++ b/dcrtest.work.sum @@ -0,0 +1,2 @@ +github.com/decred/dcrd/blockchain/v5 v5.0.0/go.mod h1:Khd1xIrEmstkaclyfvf72ecn9KIv4CjEawqBH+icTbA= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=