Skip to content

Commit 3e36856

Browse files
committed
sweepbatcher: re-add sweeps after fully confirmed
In case of a reorg sweeps should not go to another batch but stay in the current batch until it is fully confirmed. Only after that the remaining sweeps are re-added to another batch. Field sweep.completed is now set to true only for fully-confirmed sweeps.
1 parent 4bd840f commit 3e36856

File tree

4 files changed

+134
-113
lines changed

4 files changed

+134
-113
lines changed

sweepbatcher/store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ type dbSweep struct {
248248
// Amount is the amount of the sweep.
249249
Amount btcutil.Amount
250250

251-
// Completed indicates whether this sweep is completed.
251+
// Completed indicates whether this sweep is fully-confirmed.
252252
Completed bool
253253
}
254254

sweepbatcher/sweep_batch.go

+122-99
Original file line numberDiff line numberDiff line change
@@ -1933,7 +1933,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep,
19331933
func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19341934
var (
19351935
txHash = spendTx.TxHash()
1936-
purgeList = make([]SweepRequest, 0, len(b.sweeps))
19371936
notifyList = make([]sweep, 0, len(b.sweeps))
19381937
)
19391938
b.batchTxid = &txHash
@@ -1943,7 +1942,100 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19431942
b.Warnf("transaction %v has no outputs", txHash)
19441943
}
19451944

1946-
// Determine if we should use presigned mode for the batch.
1945+
// Make a set of confirmed sweeps.
1946+
confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn))
1947+
for _, txIn := range spendTx.TxIn {
1948+
confirmedSet[txIn.PreviousOutPoint] = struct{}{}
1949+
}
1950+
1951+
// As a previous version of the batch transaction may get confirmed,
1952+
// which does not contain the latest sweeps, we need to detect the
1953+
// sweeps that did not make it to the confirmed transaction and feed
1954+
// them back to the batcher. This will ensure that the sweeps will enter
1955+
// a new batch instead of remaining dangling.
1956+
var (
1957+
totalSweptAmt btcutil.Amount
1958+
confirmedSweeps = []wire.OutPoint{}
1959+
)
1960+
for _, sweep := range b.sweeps {
1961+
// Skip sweeps that were not included into the confirmed tx.
1962+
_, found := confirmedSet[sweep.outpoint]
1963+
if !found {
1964+
continue
1965+
}
1966+
1967+
totalSweptAmt += sweep.value
1968+
notifyList = append(notifyList, sweep)
1969+
confirmedSweeps = append(confirmedSweeps, sweep.outpoint)
1970+
}
1971+
1972+
// Calculate the fee portion that each sweep should pay for the batch.
1973+
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
1974+
spendTx, len(notifyList), totalSweptAmt,
1975+
)
1976+
1977+
for _, sweep := range notifyList {
1978+
// If the sweep's notifier is empty then this means that a swap
1979+
// is not waiting to read an update from it, so we can skip
1980+
// the notification part.
1981+
if sweep.notifier == nil ||
1982+
*sweep.notifier == (SpendNotifier{}) {
1983+
1984+
continue
1985+
}
1986+
1987+
spendDetail := SpendDetail{
1988+
Tx: spendTx,
1989+
OnChainFeePortion: getFeePortionPaidBySweep(
1990+
spendTx, feePortionPaidPerSweep,
1991+
roundingDifference, &sweep,
1992+
),
1993+
}
1994+
1995+
// Dispatch the sweep notifier, we don't care about the outcome
1996+
// of this action so we don't wait for it.
1997+
go sweep.notifySweepSpend(ctx, &spendDetail)
1998+
}
1999+
2000+
b.Infof("spent, confirmed sweeps: %v", confirmedSweeps)
2001+
2002+
// We are no longer able to accept new sweeps, so we mark the batch as
2003+
// closed and persist on storage.
2004+
b.state = Closed
2005+
2006+
if err := b.persist(ctx); err != nil {
2007+
return fmt.Errorf("saving batch failed: %w", err)
2008+
}
2009+
2010+
if err := b.monitorConfirmations(ctx); err != nil {
2011+
return fmt.Errorf("monitorConfirmations failed: %w", err)
2012+
}
2013+
2014+
return nil
2015+
}
2016+
2017+
// handleConf handles a confirmation notification. This is the final step of the
2018+
// batch. Here we signal to the batcher that this batch was completed.
2019+
func (b *batch) handleConf(ctx context.Context,
2020+
conf *chainntnfs.TxConfirmation) error {
2021+
2022+
spendTx := conf.Tx
2023+
txHash := spendTx.TxHash()
2024+
if b.batchTxid == nil || *b.batchTxid != txHash {
2025+
b.Warnf("Mismatch of batch txid: tx in spend notification had "+
2026+
"txid %v, but confirmation notification has txif %v. "+
2027+
"Using the later.", b.batchTxid, txHash)
2028+
}
2029+
b.batchTxid = &txHash
2030+
2031+
b.Infof("confirmed in txid %s", b.batchTxid)
2032+
b.state = Confirmed
2033+
2034+
if err := b.persist(ctx); err != nil {
2035+
return fmt.Errorf("saving batch failed: %w", err)
2036+
}
2037+
2038+
// If the batch is in presigned mode, cleanup presignedHelper.
19472039
presigned, err := b.isPresigned()
19482040
if err != nil {
19492041
return fmt.Errorf("failed to determine if the batch %d uses "+
@@ -1965,38 +2057,43 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19652057
}
19662058
}
19672059

2060+
// Make a set of confirmed sweeps.
2061+
confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn))
2062+
for _, txIn := range spendTx.TxIn {
2063+
confirmedSet[txIn.PreviousOutPoint] = struct{}{}
2064+
}
2065+
19682066
// As a previous version of the batch transaction may get confirmed,
19692067
// which does not contain the latest sweeps, we need to detect the
19702068
// sweeps that did not make it to the confirmed transaction and feed
19712069
// them back to the batcher. This will ensure that the sweeps will enter
19722070
// a new batch instead of remaining dangling.
19732071
var (
1974-
totalSweptAmt btcutil.Amount
19752072
confirmedSweeps = []wire.OutPoint{}
1976-
purgedSweeps = []wire.OutPoint{}
1977-
purgedSwaps = []lntypes.Hash{}
2073+
purgeList = make([]SweepRequest, 0, len(b.sweeps))
19782074
)
19792075
for _, sweep := range allSweeps {
1980-
found := false
1981-
1982-
for _, txIn := range spendTx.TxIn {
1983-
if txIn.PreviousOutPoint == sweep.outpoint {
1984-
found = true
1985-
totalSweptAmt += sweep.value
1986-
notifyList = append(notifyList, sweep)
1987-
confirmedSweeps = append(
1988-
confirmedSweeps, sweep.outpoint,
1989-
)
2076+
_, found := confirmedSet[sweep.outpoint]
2077+
if found {
2078+
// Save the sweep as completed. Note that sweeps are
2079+
// marked completed after the batch is marked confirmed
2080+
// because the check in handleSweeps checks sweep's
2081+
// status first and then checks the batch status.
2082+
err := b.persistSweep(ctx, sweep, true)
2083+
if err != nil {
2084+
return err
19902085
}
2086+
2087+
confirmedSweeps = append(
2088+
confirmedSweeps, sweep.outpoint,
2089+
)
2090+
2091+
continue
19912092
}
19922093

19932094
// If the sweep's outpoint was not found in the transaction's
19942095
// inputs this means it was left out. So we delete it from this
19952096
// batch and feed it back to the batcher.
1996-
if found {
1997-
continue
1998-
}
1999-
20002097
newSweep := sweep
20012098
delete(b.sweeps, sweep.outpoint)
20022099

@@ -2023,46 +2120,19 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
20232120
})
20242121
}
20252122
}
2123+
var (
2124+
purgedSweeps = []wire.OutPoint{}
2125+
purgedSwaps = []lntypes.Hash{}
2126+
)
20262127
for _, sweepReq := range purgeList {
20272128
purgedSwaps = append(purgedSwaps, sweepReq.SwapHash)
20282129
for _, input := range sweepReq.Inputs {
20292130
purgedSweeps = append(purgedSweeps, input.Outpoint)
20302131
}
20312132
}
20322133

2033-
// Calculate the fee portion that each sweep should pay for the batch.
2034-
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
2035-
spendTx, len(notifyList), totalSweptAmt,
2036-
)
2037-
2038-
for _, sweep := range notifyList {
2039-
// Save the sweep as completed.
2040-
err := b.persistSweep(ctx, sweep, true)
2041-
if err != nil {
2042-
return err
2043-
}
2044-
2045-
// If the sweep's notifier is empty then this means that a swap
2046-
// is not waiting to read an update from it, so we can skip
2047-
// the notification part.
2048-
if sweep.notifier == nil ||
2049-
*sweep.notifier == (SpendNotifier{}) {
2050-
2051-
continue
2052-
}
2053-
2054-
spendDetail := SpendDetail{
2055-
Tx: spendTx,
2056-
OnChainFeePortion: getFeePortionPaidBySweep(
2057-
spendTx, feePortionPaidPerSweep,
2058-
roundingDifference, &sweep,
2059-
),
2060-
}
2061-
2062-
// Dispatch the sweep notifier, we don't care about the outcome
2063-
// of this action so we don't wait for it.
2064-
go sweep.notifySweepSpend(ctx, &spendDetail)
2065-
}
2134+
b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+
2135+
"purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps)
20662136

20672137
// Proceed with purging the sweeps. This will feed the sweeps that
20682138
// didn't make it to the confirmed batch transaction back to the batcher
@@ -2080,46 +2150,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
20802150
}
20812151
}()
20822152

2083-
b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+
2084-
"purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps)
2085-
2086-
// We are no longer able to accept new sweeps, so we mark the batch as
2087-
// closed and persist on storage.
2088-
b.state = Closed
2089-
2090-
if err := b.persist(ctx); err != nil {
2091-
return fmt.Errorf("saving batch failed: %w", err)
2092-
}
2093-
2094-
err = b.monitorConfirmations(ctx)
2095-
if err != nil {
2096-
return fmt.Errorf("monitorConfirmations failed: %w", err)
2097-
}
2098-
2099-
return nil
2100-
}
2101-
2102-
// handleConf handles a confirmation notification. This is the final step of the
2103-
// batch. Here we signal to the batcher that this batch was completed.
2104-
func (b *batch) handleConf(ctx context.Context,
2105-
conf *chainntnfs.TxConfirmation) error {
2106-
2107-
spendTx := conf.Tx
2108-
txHash := spendTx.TxHash()
2109-
if b.batchTxid == nil || *b.batchTxid != txHash {
2110-
b.Warnf("Mismatch of batch txid: tx in spend notification had "+
2111-
"txid %v, but confirmation notification has txif %v. "+
2112-
"Using the later.", b.batchTxid, txHash)
2113-
}
2114-
b.batchTxid = &txHash
2115-
2116-
// If the batch is in presigned mode, cleanup presignedHelper.
2117-
presigned, err := b.isPresigned()
2118-
if err != nil {
2119-
return fmt.Errorf("failed to determine if the batch %d uses "+
2120-
"presigned mode: %w", b.id, err)
2121-
}
2122-
21232153
if presigned {
21242154
b.Infof("Cleaning up presigned store")
21252155

@@ -2135,13 +2165,6 @@ func (b *batch) handleConf(ctx context.Context,
21352165
}
21362166
}
21372167

2138-
b.Infof("confirmed in txid %s", b.batchTxid)
2139-
b.state = Confirmed
2140-
2141-
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
2142-
return fmt.Errorf("failed to store confirmed state: %w", err)
2143-
}
2144-
21452168
// Send the confirmation to all the notifiers.
21462169
for _, s := range b.sweeps {
21472170
// If the sweep's notifier is empty then this means that

sweepbatcher/sweep_batcher.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -756,13 +756,13 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
756756
return err
757757
}
758758

759-
infof("Batcher handling sweep %x, presigned=%v, completed=%v",
759+
infof("Batcher handling sweep %x, presigned=%v, fully_confirmed=%v",
760760
sweep.swapHash[:6], sweep.presigned, completed)
761761

762762
// If the sweep has already been completed in a confirmed batch then we
763763
// can't attach its notifier to the batch as that is no longer running.
764764
// Instead we directly detect and return the spend here.
765-
if completed && *notifier != (SpendNotifier{}) {
765+
if completed {
766766
// Verify that the parent batch is confirmed. Note that a batch
767767
// is only considered confirmed after it has received three
768768
// on-chain confirmations to prevent issues caused by reorgs.
@@ -1064,6 +1064,11 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
10641064
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
10651065
parentBatchID int32, notifier *SpendNotifier) error {
10661066

1067+
// If the caller has not provided a notifier, stop.
1068+
if notifier == nil || *notifier == (SpendNotifier{}) {
1069+
return nil
1070+
}
1071+
10671072
spendCtx, cancel := context.WithCancel(ctx)
10681073

10691074
// Then we get the total amount that was swept by the batch.

sweepbatcher/sweep_batcher_test.go

+4-11
Original file line numberDiff line numberDiff line change
@@ -2302,22 +2302,15 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore,
23022302
return b.state == Closed
23032303
}, test.Timeout, eventuallyCheckFrequency)
23042304

2305-
// Since second batch was created we check that it registered for its
2306-
// primary sweep's spend.
2307-
<-lnd.RegisterSpendChannel
2308-
2309-
// While handling the spend notification the batch should detect that
2310-
// some sweeps did not appear in the spending tx, therefore it redirects
2311-
// them back to the batcher and the batcher inserts them in a new batch.
2312-
require.Eventually(t, func() bool {
2313-
return batcher.numBatches(ctx) == 2
2314-
}, test.Timeout, eventuallyCheckFrequency)
2315-
23162305
// We mock the confirmation notification.
23172306
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
23182307
Tx: spendingTx,
23192308
}
23202309

2310+
// Since second batch was created we check that it registered for its
2311+
// primary sweep's spend.
2312+
<-lnd.RegisterSpendChannel
2313+
23212314
// Wait for tx to be published.
23222315
// Here is a race condition, which is unlikely to cause a crash: if we
23232316
// wait for publish tx before sending a conf notification (previous

0 commit comments

Comments
 (0)