diff --git a/dcrdtest/go.mod b/dcrdtest/go.mod index 1e23aaa..86f1d28 100644 --- a/dcrdtest/go.mod +++ b/dcrdtest/go.mod @@ -21,6 +21,8 @@ require ( github.com/decred/dcrd/rpcclient/v8 v8.0.0 github.com/decred/dcrd/txscript/v4 v4.1.0 github.com/decred/dcrd/wire v1.6.0 + github.com/decred/slog v1.2.0 + matheusd.com/testctx v0.1.0 ) require ( @@ -42,7 +44,6 @@ require ( github.com/decred/dcrd/math/uint256 v1.0.1 // indirect github.com/decred/dcrd/peer/v3 v3.0.2 // indirect github.com/decred/go-socks v1.1.0 // indirect - github.com/decred/slog v1.2.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/jessevdk/go-flags v1.5.0 // indirect diff --git a/dcrdtest/go.sum b/dcrdtest/go.sum index b1ba2c1..0e39638 100644 --- a/dcrdtest/go.sum +++ b/dcrdtest/go.sum @@ -145,3 +145,5 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= +matheusd.com/testctx v0.1.0 h1:MBpaNuqr23ugnkA59gz8Bd6BQIGkvZr7M4vYAc/Apzc= +matheusd.com/testctx v0.1.0/go.mod h1:u9la0YA1XIBcEpTU/aHJ9q4/L0VttkwhkG2m4lrj7Ls= diff --git a/dcrdtest/memwallet.go b/dcrdtest/memwallet.go index 299019f..1b854f1 100644 --- a/dcrdtest/memwallet.go +++ b/dcrdtest/memwallet.go @@ -114,6 +114,12 @@ type memWallet struct { net *chaincfg.Params + // quit is closed when the harness node is stopped. + quit chan struct{} + + // wg tracks the mem wallet's goroutines. + wg sync.WaitGroup + rpc *rpcclient.Client sync.RWMutex @@ -164,14 +170,23 @@ func newMemWallet(net *chaincfg.Params, harnessID uint32) (*memWallet, error) { utxos: make(map[wire.OutPoint]*utxo), chainUpdateSignal: make(chan struct{}), reorgJournal: make(map[int64]*undoEntry), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, }, nil } // Start launches all goroutines required for the wallet to function properly. func (m *memWallet) Start() { + m.wg.Add(1) go m.chainSyncer() } +// Stop stops all goroutines required for the wallet to function properly. +func (m *memWallet) Stop() { + close(m.quit) + m.wg.Wait() +} + // SyncedHeight returns the height the wallet is known to be synced to. // // This function is safe for concurrent access. @@ -219,7 +234,10 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) { // available. We do this in a new goroutine in order to avoid blocking // the main loop of the rpc client. go func() { - m.chainUpdateSignal <- struct{}{} + select { + case m.chainUpdateSignal <- struct{}{}: + case <-m.quit: + } }() } @@ -230,10 +248,17 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) { func (m *memWallet) chainSyncer() { log.Tracef("memwallet.chainSyncer") defer log.Tracef("memwallet.chainSyncer exit") + defer m.wg.Done() var update *chainUpdate - for range m.chainUpdateSignal { + for { + select { + case <-m.chainUpdateSignal: + case <-m.quit: + return + } + // A new update is available, so pop the new chain update from // the front of the update queue. m.chainMtx.Lock() diff --git a/dcrdtest/node.go b/dcrdtest/node.go index 325a79d..cb1771b 100644 --- a/dcrdtest/node.go +++ b/dcrdtest/node.go @@ -24,6 +24,9 @@ import ( rpc "github.com/decred/dcrd/rpcclient/v8" ) +// errDcrdCmdExec is the error returned when the dcrd binary is not executed. +var errDcrdCmdExec = errors.New("unable to exec dcrd binary") + // nodeConfig contains all the args, and data required to launch a dcrd process // and connect the rpc client to it. type nodeConfig struct { @@ -201,7 +204,6 @@ func newNode(config *nodeConfig, dataDir string) (*node, error) { return &node{ config: config, dataDir: dataDir, - cmd: config.command(), }, nil } @@ -216,8 +218,10 @@ func (n *node) start(ctx context.Context) error { var pid sync.WaitGroup pid.Add(1) + cmd := n.config.command() + // Redirect stderr. - n.stderr, err = n.cmd.StderrPipe() + n.stderr, err = cmd.StderrPipe() if err != nil { return err } @@ -229,7 +233,10 @@ func (n *node) start(ctx context.Context) error { for { line, err := r.ReadBytes('\n') if errors.Is(err, io.EOF) { - log.Tracef("stderr: EOF") + n.logf("stderr: EOF") + return + } else if err != nil { + n.logf("stderr: Unable to read stderr: %v", err) return } n.logf("stderr: %s", line) @@ -237,7 +244,7 @@ func (n *node) start(ctx context.Context) error { }() // Redirect stdout. - n.stdout, err = n.cmd.StdoutPipe() + n.stdout, err = cmd.StdoutPipe() if err != nil { return err } @@ -249,7 +256,10 @@ func (n *node) start(ctx context.Context) error { for { line, err := r.ReadBytes('\n') if errors.Is(err, io.EOF) { - log.Tracef("stdout: EOF") + n.logf("stdout: EOF") + return + } else if err != nil { + n.logf("stdout: Unable to read stdout: %v", err) return } log.Tracef("stdout: %s", line) @@ -269,8 +279,10 @@ func (n *node) start(ctx context.Context) error { switch msg := msg.(type) { case boundP2PListenAddrEvent: p2pAddr = string(msg) + n.logf("P2P listen addr: %s", p2pAddr) case boundRPCListenAddrEvent: rpcAddr = string(msg) + n.logf("RPC listen addr: %s", rpcAddr) } if p2pAddr != "" && rpcAddr != "" { close(gotSubsysAddrs) @@ -283,33 +295,45 @@ func (n *node) start(ctx context.Context) error { for err == nil { _, err = nextIPCMessage(n.config.pipeRX.r) } + n.logf("IPC messages drained") }() // Launch command and store pid. - if err := n.cmd.Start(); err != nil { - return err + if err := cmd.Start(); err != nil { + // When failing to execute, wait until running goroutines are + // closed. + pid.Done() + n.wg.Wait() + n.config.pipeTX.close() + n.config.pipeRX.close() + return fmt.Errorf("%w: %v", errDcrdCmdExec, err) } + n.cmd = cmd n.pid = n.cmd.Process.Pid - // Unblock pipes now pid is available + // Unblock pipes now that pid is available. pid.Done() f, err := os.Create(filepath.Join(n.config.String(), "dcrd.pid")) if err != nil { + _ = n.stop() // Cleanup what has been done so far. return err } n.pidFile = f.Name() if _, err = fmt.Fprintf(f, "%d\n", n.cmd.Process.Pid); err != nil { + _ = n.stop() // Cleanup what has been done so far. return err } if err := f.Close(); err != nil { + _ = n.stop() // Cleanup what has been done so far. return err } // Read the RPC and P2P addresses. select { case <-ctx.Done(): + _ = n.stop() // Cleanup what has been done so far. return ctx.Err() case <-gotSubsysAddrs: n.p2pAddr = p2pAddr @@ -323,7 +347,7 @@ func (n *node) start(ctx context.Context) error { // properly. On windows, interrupt is not supported, so a kill signal is used // instead func (n *node) stop() error { - log.Tracef("stop %p %p", n.cmd, n.cmd.Process) + log.Tracef("stop %p", n.cmd) defer log.Tracef("stop done") if n.cmd == nil || n.cmd.Process == nil { @@ -366,6 +390,9 @@ func (n *node) stop() error { if err := n.config.pipeTX.close(); err != nil { n.logf("Unable to close pipe TX: %v", err) } + + // Mark command terminated. + n.cmd = nil return nil } diff --git a/dcrdtest/rpc_harness.go b/dcrdtest/rpc_harness.go index 3f0fd9e..36f0401 100644 --- a/dcrdtest/rpc_harness.go +++ b/dcrdtest/rpc_harness.go @@ -7,6 +7,7 @@ package dcrdtest import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -38,6 +39,8 @@ var ( // throughout the life of this package. pathToDCRD string pathToDCRDMtx sync.RWMutex + + errNilCoinbaseAddr = errors.New("memWallet coinbase addr is nil") ) // Harness fully encapsulates an active dcrd process to provide a unified @@ -218,7 +221,14 @@ func New(t *testing.T, activeNet *chaincfg.Params, handlers *rpcclient.Notificat // // NOTE: This method and TearDown should always be called from the same // goroutine as they are not concurrent safe. -func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) error { +func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) (err error) { + defer func() { + if err != nil { + tearErr := h.TearDown() + log.Warnf("Teardown error after setup error %v: %v", err, tearErr) + } + }() + // Start the dcrd node itself. This spawns a new process which will be // managed if err := h.node.start(ctx); err != nil { @@ -231,6 +241,9 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp // Filter transactions that pay to the coinbase associated with the // wallet. + if h.wallet.coinbaseAddr == nil { + return errNilCoinbaseAddr + } filterAddrs := []stdaddr.Address{h.wallet.coinbaseAddr} if err := h.Node.LoadTxFilter(ctx, true, filterAddrs, nil); err != nil { return err @@ -282,17 +295,28 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp // NOTE: This method and SetUp should always be called from the same goroutine // as they are not concurrent safe. func (h *Harness) TearDown() error { - log.Tracef("TearDown %p %p", h.Node, h.node) - defer log.Tracef("TearDown done") + log.Debugf("TearDown %p %p", h.Node, h.node) + defer log.Debugf("TearDown done") if h.Node != nil { - log.Tracef("TearDown: Node") + log.Debugf("TearDown: Node") h.Node.Shutdown() + h.Node = nil } - log.Tracef("TearDown: node") - if err := h.node.shutdown(); err != nil { - return err + if h.node != nil { + log.Debugf("TearDown: node") + node := h.node + h.node = nil + if err := node.shutdown(); err != nil { + return err + } + } + + log.Debugf("TearDown: wallet") + if h.wallet != nil { + h.wallet.Stop() + h.wallet = nil } return nil diff --git a/dcrdtest/rpc_harness_test.go b/dcrdtest/rpc_harness_test.go index b10f541..b29b5cc 100644 --- a/dcrdtest/rpc_harness_test.go +++ b/dcrdtest/rpc_harness_test.go @@ -6,8 +6,12 @@ package dcrdtest import ( + "bytes" "context" + "errors" "fmt" + "os" + "runtime/pprof" "testing" "time" @@ -16,10 +20,8 @@ import ( "github.com/decred/dcrd/dcrutil/v4" dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/wire" -) - -const ( - numMatureOutputs = 25 + "github.com/decred/slog" + "matheusd.com/testctx" ) func testSendOutputs(ctx context.Context, r *Harness, t *testing.T) { @@ -565,6 +567,8 @@ func testMemWalletLockedOutputs(ctx context.Context, r *Harness, t *testing.T) { } func TestHarness(t *testing.T) { + const numMatureOutputs = 25 + var err error mainHarness, err := New(t, chaincfg.RegNetParams(), nil, nil) if err != nil { @@ -575,12 +579,6 @@ func TestHarness(t *testing.T) { // 25 mature coinbases to allow spending from for testing purposes. ctx := context.Background() if err = mainHarness.SetUp(ctx, true, numMatureOutputs); err != nil { - // Even though the harness was not fully setup, it still needs - // to be torn down to ensure all resources such as temp - // directories are cleaned up. The error is intentionally - // ignored since this is already an error path and nothing else - // could be done about it anyways. - _ = mainHarness.TearDown() t.Fatalf("unable to setup test chain: %v", err) } @@ -658,3 +656,141 @@ func TestHarness(t *testing.T) { } } } + +// loggerWriter is an slog backend that writes to a test output. +type loggerWriter struct { + l testing.TB +} + +func (lw loggerWriter) Write(b []byte) (int, error) { + bt := bytes.TrimRight(b, "\r\n") + lw.l.Logf(string(bt)) + return len(b), nil +} + +// TestSetupTeardown tests that setting up and tearing down an rpc harness does +// not leak any goroutines. +func TestSetupTeardown(t *testing.T) { + // Add logging to ease debugging this test. + lw := loggerWriter{l: t} + bknd := slog.NewBackend(lw) + UseLogger(bknd.Logger("TEST")) + log.SetLevel(slog.LevelDebug) + defer UseLogger(slog.Disabled) + + params := chaincfg.RegNetParams() + mainHarness, err := New(t, params, nil, nil) + if err != nil { + t.Fatalf("unable to create main harness: %v", err) + } + + // Perform the setup. + ctx := testctx.WithTimeout(t, time.Second*30) + if err = mainHarness.SetUp(ctx, true, 2); err != nil { + t.Fatalf("unable to setup test chain: %v", err) + } + + // Perform the teardown. + if err := mainHarness.TearDown(); err != nil { + t.Fatalf("unable to TearDown test chain: %v", err) + } + + // There should be only 2 goroutines live. + prof := pprof.Lookup("goroutine") + gotCount, wantCount := prof.Count(), 2 + if gotCount != wantCount { + prof.WriteTo(os.Stderr, 1) + t.Fatalf("Unexpected nb of active goroutines: got %d, want %d", + gotCount, wantCount) + } +} + +// TestSetupWithError tests that when the setup of an rpc harness fails, it +// cleanly tears down the harness without user intervention. +func TestSetupWithError(t *testing.T) { + // Keep track of how many goroutines are running before the test + // happens. + beforeCount := pprof.Lookup("goroutine").Count() + + // Add logging to ease debugging this test. + lw := loggerWriter{l: t} + bknd := slog.NewBackend(lw) + UseLogger(bknd.Logger("TEST")) + log.SetLevel(slog.LevelDebug) + defer UseLogger(slog.Disabled) + + params := chaincfg.RegNetParams() + mainHarness, err := New(t, params, nil, nil) + if err != nil { + t.Fatalf("unable to create main harness: %v", err) + } + + // Reach into the wallet and set a nil coinbaseAddr. This will cause + // SetUp() to fail with a well known error. + mainHarness.wallet.coinbaseAddr = nil + + // Perform the setup. This should fail. + ctx := testctx.WithTimeout(t, time.Second*30) + if err := mainHarness.SetUp(ctx, true, 2); !errors.Is(err, errNilCoinbaseAddr) { + t.Fatalf("Unexpected error in Setup(): got %v, want %v", err, errNilCoinbaseAddr) + } + + // There should not be any new goroutines running. + prof := pprof.Lookup("goroutine") + afterCount := prof.Count() + if afterCount != beforeCount { + prof.WriteTo(os.Stderr, 1) + t.Fatalf("Unexpected nb of active goroutines: got %d, want %d", + afterCount, beforeCount) + } + + // Calling TearDown should not panic or error. + if err := mainHarness.TearDown(); err != nil { + t.Fatalf("Unexpected error during TearDown: %v", err) + } +} + +// TestSetupWithWrongDcrd tests that when the setup of an rpc harness fails due +// to an inexistent dcrd binary, it cleanly tears down the harness without user +// intervention. +func TestSetupWithWrongDcrd(t *testing.T) { + // Keep track of how many goroutines are running before the test + // happens. + beforeCount := pprof.Lookup("goroutine").Count() + + // Add logging to ease debugging this test. + lw := loggerWriter{l: t} + bknd := slog.NewBackend(lw) + UseLogger(bknd.Logger("TEST")) + log.SetLevel(slog.LevelDebug) + defer UseLogger(slog.Disabled) + + SetPathToDCRD("/path/to/dcrd/that/does/not/exist") + defer SetPathToDCRD("") + + params := chaincfg.RegNetParams() + mainHarness, err := New(t, params, nil, nil) + if err != nil { + t.Fatalf("unable to create main harness: %v", err) + } + + // Perform the setup. This should fail. + ctx := testctx.WithTimeout(t, time.Second*30) + if err := mainHarness.SetUp(ctx, true, 2); !errors.Is(err, errDcrdCmdExec) { + t.Fatalf("Unexpected error in Setup(): got %v, want %v", err, errDcrdCmdExec) + } + + // There should not be any new goroutines running. + prof := pprof.Lookup("goroutine") + afterCount := prof.Count() + if afterCount != beforeCount { + prof.WriteTo(os.Stderr, 1) + t.Fatalf("Unexpected nb of active goroutines: got %d, want %d", + afterCount, beforeCount) + } + + // Calling TearDown should not panic or error. + if err := mainHarness.TearDown(); err != nil { + t.Fatalf("Unexpected error during TearDown: %v", err) + } +} diff --git a/dcrtest.work.sum b/dcrtest.work.sum new file mode 100644 index 0000000..1dfd445 --- /dev/null +++ b/dcrtest.work.sum @@ -0,0 +1,2 @@ +github.com/decred/dcrd/blockchain/v5 v5.0.0/go.mod h1:Khd1xIrEmstkaclyfvf72ecn9KIv4CjEawqBH+icTbA= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=