From da22df69a63b000e41bf2ee2378dcf36f83c0455 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 17 Jan 2025 01:00:46 -0600 Subject: [PATCH] allocator: ensure we read-through all Etcd revisions before continuing It was previously possible for some Etcd transactions to be applied by Checkpoint within a converge() call, and for a failure to then cause converge() to bubble up an error, which could result in not reading through the preceeding successful transactions before continuing. Refactor checkpointTxn to retain the last successful transaction revision, and ensure we read through this revision regardless of whether the converge fully completed or failed along the way. --- allocator/allocator.go | 61 ++++++++++++++++++++++-------------- allocator/allocator_test.go | 12 ++----- allocator/item_state_test.go | 9 +++--- allocator/scenarios_test.go | 10 +++--- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/allocator/allocator.go b/allocator/allocator.go index feb7efad..bdecbd6f 100644 --- a/allocator/allocator.go +++ b/allocator/allocator.go @@ -60,9 +60,8 @@ func Allocate(args AllocateArgs) error { return nil } - // Response of the last transaction we applied. We'll ensure we've minimally - // watched through its revision before driving further action. - var txnResponse *clientv3.TxnResponse + // `next` Etcd revision we must read through before proceeding. + var next = ks.Header.Revision + 1 if state.isLeader() { @@ -104,7 +103,13 @@ func Allocate(args AllocateArgs) error { // Converge the current state towards |desired|. var err error if err = converge(txn, state, desired); err == nil { - txnResponse, err = txn.Commit() + err = txn.Flush() + } + + // We must read through any Etcd transactions applied by `txn`, + // even if it subsequently encountered an error. + if r := txn.Revision(); r > next { + next = r } if err != nil { @@ -118,19 +123,13 @@ func Allocate(args AllocateArgs) error { allocatorNumItemSlots.Set(float64(state.ItemSlots)) if args.TestHook != nil { - args.TestHook(round, txn.noop) + args.TestHook(round, txn.Revision() == 0) } round++ } } - // Await the next KeySpace change. If we completed a transaction, - // ensure we read through its revision before iterating again. - var next = ks.Header.Revision + 1 - - if txnResponse != nil && txnResponse.Header.Revision > next { - next = txnResponse.Header.Revision - } + // Await the next known Etcd revision affecting our KeySpace. if err := ks.WaitForRevision(ctx, next); err != nil { return err } @@ -244,15 +243,24 @@ func modRevisionUnchanged(kv keyspace.KeyValue) clientv3.Cmp { // - It allows If and Then to be called multiple times. // - It removes Else, as incompatible with the checkpoint model. As such, // a Txn which does not succeed becomes an error. +// +// If Checkpoint() or Flush() return an error, that error is terminal. +// However, a preceding transaction may have been applied. +// The caller must consult Revision() to determine the Etcd revision +// to read-through before proceeding, or if this transaction was a noop then +// Revision() will be zero. type checkpointTxn interface { If(...clientv3.Cmp) checkpointTxn Then(...clientv3.Op) checkpointTxn - Commit() (*clientv3.TxnResponse, error) // Checkpoint ensures that all If and Then invocations since the last // Checkpoint are issued in the same underlying Txn. It may partially // flush the transaction to Etcd. Checkpoint() error + // Flush a pending checkpoint to Etcd. + Flush() error + // Revision known to this checkpointTxn which should be read through. + Revision() int64 } // batchedTxn implements the checkpointTxn interface, potentially queuing across @@ -270,8 +278,8 @@ type batchedTxn struct { nextOps []clientv3.Op // Cmps which should be asserted on every underlying Txn. fixedCmps []clientv3.Cmp - // Flags whether no operations have committed with this batchedTxn. - noop bool + // Applied revision to be read through. + revision int64 } // newBatchedTxn returns a batchedTxn using the given Context and KV. It will @@ -290,7 +298,7 @@ func newBatchedTxn(ctx context.Context, kv clientv3.KV, fixedCmps ...clientv3.Cm } }, fixedCmps: fixedCmps, - noop: true, + revision: 0, } } @@ -322,7 +330,7 @@ func (b *batchedTxn) Checkpoint() error { b.nextCmps, b.nextOps = b.nextCmps[:0], b.nextOps[:0] if lc, lo := len(b.cmps)+len(nc), len(b.ops)+len(no); lc > maxTxnOps || lo > maxTxnOps { - if _, err := b.Commit(); err != nil { + if err := b.Flush(); err != nil { return err } b.cmps = append(b.cmps, b.fixedCmps...) @@ -333,13 +341,12 @@ func (b *batchedTxn) Checkpoint() error { return nil } -func (b *batchedTxn) Commit() (*clientv3.TxnResponse, error) { +func (b *batchedTxn) Flush() error { if len(b.nextCmps) != 0 || len(b.nextOps) != 0 { panic("must call Checkpoint before Commit") } else if len(b.ops) == 0 { - return nil, nil // No-op. + return nil // No-op. } - var response, err = b.txnDo(clientv3.OpTxn(b.cmps, b.ops, nil)) if log.GetLevel() >= log.DebugLevel { @@ -347,16 +354,22 @@ func (b *batchedTxn) Commit() (*clientv3.TxnResponse, error) { } if err != nil { - return nil, err + return err } else if !response.Succeeded { - return response, fmt.Errorf("transaction checks did not succeed") + // Don't retain the response revision because it may be outside our + // KeySpace, and we'd block indefinitely attempting to await it. + return fmt.Errorf("transaction checks did not succeed") } else { - b.noop = false + b.revision = response.Header.Revision b.cmps, b.ops = b.cmps[:0], b.ops[:0] - return response, nil + return nil } } +func (b *batchedTxn) Revision() int64 { + return b.revision +} + func (b *batchedTxn) debugLogTxn(response *clientv3.TxnResponse, err error) { var dbgCmps, dbgOps []string for _, c := range b.cmps { diff --git a/allocator/allocator_test.go b/allocator/allocator_test.go index 6be9b696..aad7480b 100644 --- a/allocator/allocator_test.go +++ b/allocator/allocator_test.go @@ -159,9 +159,7 @@ func (s *AllocatorSuite) TestTxnBatching(c *gc.C) { )) // Final commit. Expect it flushes the last checkpoint. - var r, err = txn.Commit() - c.Check(r, gc.Equals, txnResp) - c.Check(err, gc.IsNil) + c.Check(txn.Flush(), gc.IsNil) c.Check(txnOp, gc.DeepEquals, clientv3.OpTxn( []clientv3.Cmp{fixedCmp, testCmp}, @@ -172,17 +170,13 @@ func (s *AllocatorSuite) TestTxnBatching(c *gc.C) { // Empty Checkpoint, then Commit. Expect it's treated as a no-op. c.Check(txn.Checkpoint(), gc.IsNil) - r, err = txn.Commit() - c.Check(r, gc.IsNil) - c.Check(err, gc.IsNil) + c.Check(txn.Flush(), gc.IsNil) // Non-empty commit that fails checks. Expect it's mapped to an error. c.Check(txn.Then(testOp).Checkpoint(), gc.IsNil) txnResp.Succeeded = false - r, err = txn.Commit() - c.Check(r, gc.Equals, txnResp) - c.Check(err, gc.ErrorMatches, "transaction checks did not succeed") + c.Check(txn.Flush(), gc.ErrorMatches, "transaction checks did not succeed") } var _ = gc.Suite(&AllocatorSuite{}) diff --git a/allocator/item_state_test.go b/allocator/item_state_test.go index 50df0641..9e5262e9 100644 --- a/allocator/item_state_test.go +++ b/allocator/item_state_test.go @@ -424,9 +424,10 @@ type mockTxnBuilder struct { ops []clientv3.Op } -func (b *mockTxnBuilder) If(c ...clientv3.Cmp) checkpointTxn { b.cmps = append(b.cmps, c...); return b } -func (b *mockTxnBuilder) Then(o ...clientv3.Op) checkpointTxn { b.ops = append(b.ops, o...); return b } -func (b *mockTxnBuilder) Checkpoint() error { return nil } -func (b *mockTxnBuilder) Commit() (*clientv3.TxnResponse, error) { panic("not supported") } +func (b *mockTxnBuilder) If(c ...clientv3.Cmp) checkpointTxn { b.cmps = append(b.cmps, c...); return b } +func (b *mockTxnBuilder) Then(o ...clientv3.Op) checkpointTxn { b.ops = append(b.ops, o...); return b } +func (b *mockTxnBuilder) Checkpoint() error { return nil } +func (b *mockTxnBuilder) Flush() error { panic("not supported") } +func (b *mockTxnBuilder) Revision() int64 { panic("not supported") } var _ = gc.Suite(&ItemStateSuite{}) diff --git a/allocator/scenarios_test.go b/allocator/scenarios_test.go index bf2db1d5..e6e6b4db 100644 --- a/allocator/scenarios_test.go +++ b/allocator/scenarios_test.go @@ -890,8 +890,7 @@ func insert(ctx context.Context, client *clientv3.Client, keyValues ...string) e return err } } - var _, err = txn.Commit() - return err + return txn.Flush() } // update updates keys with values, requiring that the key already exist. @@ -906,8 +905,7 @@ func update(ctx context.Context, client *clientv3.Client, keyValues ...string) e return err } } - var _, err = txn.Commit() - return err + return txn.Flush() } // markAllConsistent which updates all Assignments to have a value of "consistent". @@ -928,9 +926,9 @@ func markAllConsistent(ctx context.Context, client *clientv3.Client, ks *keyspac } } - if _, err := txn.Commit(); err != nil { + if err := txn.Flush(); err != nil { return err - } else if txn.noop { + } else if txn.Revision() == 0 { return io.ErrNoProgress } else { return nil