Skip to content

Commit

Permalink
allocator: ensure we read-through all Etcd revisions before continuing
Browse files Browse the repository at this point in the history
It was previously possible for some Etcd transactions to be applied
by Checkpoint within a converge() call, and for a failure to then cause
converge() to bubble up an error, which could result in not reading
through the preceeding successful transactions before continuing.

Refactor checkpointTxn to retain the last successful transaction
revision, and ensure we read through this revision regardless of whether
the converge fully completed or failed along the way.
  • Loading branch information
jgraettinger committed Jan 17, 2025
1 parent 7a8e3aa commit da22df6
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 43 deletions.
61 changes: 37 additions & 24 deletions allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ func Allocate(args AllocateArgs) error {
return nil
}

// Response of the last transaction we applied. We'll ensure we've minimally
// watched through its revision before driving further action.
var txnResponse *clientv3.TxnResponse
// `next` Etcd revision we must read through before proceeding.
var next = ks.Header.Revision + 1

if state.isLeader() {

Expand Down Expand Up @@ -104,7 +103,13 @@ func Allocate(args AllocateArgs) error {
// Converge the current state towards |desired|.
var err error
if err = converge(txn, state, desired); err == nil {
txnResponse, err = txn.Commit()
err = txn.Flush()
}

// We must read through any Etcd transactions applied by `txn`,
// even if it subsequently encountered an error.
if r := txn.Revision(); r > next {
next = r
}

if err != nil {
Expand All @@ -118,19 +123,13 @@ func Allocate(args AllocateArgs) error {
allocatorNumItemSlots.Set(float64(state.ItemSlots))

if args.TestHook != nil {
args.TestHook(round, txn.noop)
args.TestHook(round, txn.Revision() == 0)
}
round++
}
}

// Await the next KeySpace change. If we completed a transaction,
// ensure we read through its revision before iterating again.
var next = ks.Header.Revision + 1

if txnResponse != nil && txnResponse.Header.Revision > next {
next = txnResponse.Header.Revision
}
// Await the next known Etcd revision affecting our KeySpace.
if err := ks.WaitForRevision(ctx, next); err != nil {
return err
}
Expand Down Expand Up @@ -244,15 +243,24 @@ func modRevisionUnchanged(kv keyspace.KeyValue) clientv3.Cmp {
// - It allows If and Then to be called multiple times.
// - It removes Else, as incompatible with the checkpoint model. As such,
// a Txn which does not succeed becomes an error.
//
// If Checkpoint() or Flush() return an error, that error is terminal.
// However, a preceding transaction may have been applied.
// The caller must consult Revision() to determine the Etcd revision
// to read-through before proceeding, or if this transaction was a noop then
// Revision() will be zero.
type checkpointTxn interface {
If(...clientv3.Cmp) checkpointTxn
Then(...clientv3.Op) checkpointTxn
Commit() (*clientv3.TxnResponse, error)

// Checkpoint ensures that all If and Then invocations since the last
// Checkpoint are issued in the same underlying Txn. It may partially
// flush the transaction to Etcd.
Checkpoint() error
// Flush a pending checkpoint to Etcd.
Flush() error
// Revision known to this checkpointTxn which should be read through.
Revision() int64
}

// batchedTxn implements the checkpointTxn interface, potentially queuing across
Expand All @@ -270,8 +278,8 @@ type batchedTxn struct {
nextOps []clientv3.Op
// Cmps which should be asserted on every underlying Txn.
fixedCmps []clientv3.Cmp
// Flags whether no operations have committed with this batchedTxn.
noop bool
// Applied revision to be read through.
revision int64
}

// newBatchedTxn returns a batchedTxn using the given Context and KV. It will
Expand All @@ -290,7 +298,7 @@ func newBatchedTxn(ctx context.Context, kv clientv3.KV, fixedCmps ...clientv3.Cm
}
},
fixedCmps: fixedCmps,
noop: true,
revision: 0,
}
}

Expand Down Expand Up @@ -322,7 +330,7 @@ func (b *batchedTxn) Checkpoint() error {
b.nextCmps, b.nextOps = b.nextCmps[:0], b.nextOps[:0]

if lc, lo := len(b.cmps)+len(nc), len(b.ops)+len(no); lc > maxTxnOps || lo > maxTxnOps {
if _, err := b.Commit(); err != nil {
if err := b.Flush(); err != nil {
return err
}
b.cmps = append(b.cmps, b.fixedCmps...)
Expand All @@ -333,30 +341,35 @@ func (b *batchedTxn) Checkpoint() error {
return nil
}

func (b *batchedTxn) Commit() (*clientv3.TxnResponse, error) {
func (b *batchedTxn) Flush() error {
if len(b.nextCmps) != 0 || len(b.nextOps) != 0 {
panic("must call Checkpoint before Commit")
} else if len(b.ops) == 0 {
return nil, nil // No-op.
return nil // No-op.
}

var response, err = b.txnDo(clientv3.OpTxn(b.cmps, b.ops, nil))

if log.GetLevel() >= log.DebugLevel {
b.debugLogTxn(response, err)
}

if err != nil {
return nil, err
return err
} else if !response.Succeeded {
return response, fmt.Errorf("transaction checks did not succeed")
// Don't retain the response revision because it may be outside our
// KeySpace, and we'd block indefinitely attempting to await it.
return fmt.Errorf("transaction checks did not succeed")
} else {
b.noop = false
b.revision = response.Header.Revision
b.cmps, b.ops = b.cmps[:0], b.ops[:0]
return response, nil
return nil
}
}

func (b *batchedTxn) Revision() int64 {
return b.revision
}

func (b *batchedTxn) debugLogTxn(response *clientv3.TxnResponse, err error) {
var dbgCmps, dbgOps []string
for _, c := range b.cmps {
Expand Down
12 changes: 3 additions & 9 deletions allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ func (s *AllocatorSuite) TestTxnBatching(c *gc.C) {
))

// Final commit. Expect it flushes the last checkpoint.
var r, err = txn.Commit()
c.Check(r, gc.Equals, txnResp)
c.Check(err, gc.IsNil)
c.Check(txn.Flush(), gc.IsNil)

c.Check(txnOp, gc.DeepEquals, clientv3.OpTxn(
[]clientv3.Cmp{fixedCmp, testCmp},
Expand All @@ -172,17 +170,13 @@ func (s *AllocatorSuite) TestTxnBatching(c *gc.C) {
// Empty Checkpoint, then Commit. Expect it's treated as a no-op.
c.Check(txn.Checkpoint(), gc.IsNil)

r, err = txn.Commit()
c.Check(r, gc.IsNil)
c.Check(err, gc.IsNil)
c.Check(txn.Flush(), gc.IsNil)

// Non-empty commit that fails checks. Expect it's mapped to an error.
c.Check(txn.Then(testOp).Checkpoint(), gc.IsNil)
txnResp.Succeeded = false

r, err = txn.Commit()
c.Check(r, gc.Equals, txnResp)
c.Check(err, gc.ErrorMatches, "transaction checks did not succeed")
c.Check(txn.Flush(), gc.ErrorMatches, "transaction checks did not succeed")
}

var _ = gc.Suite(&AllocatorSuite{})
Expand Down
9 changes: 5 additions & 4 deletions allocator/item_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,10 @@ type mockTxnBuilder struct {
ops []clientv3.Op
}

func (b *mockTxnBuilder) If(c ...clientv3.Cmp) checkpointTxn { b.cmps = append(b.cmps, c...); return b }
func (b *mockTxnBuilder) Then(o ...clientv3.Op) checkpointTxn { b.ops = append(b.ops, o...); return b }
func (b *mockTxnBuilder) Checkpoint() error { return nil }
func (b *mockTxnBuilder) Commit() (*clientv3.TxnResponse, error) { panic("not supported") }
func (b *mockTxnBuilder) If(c ...clientv3.Cmp) checkpointTxn { b.cmps = append(b.cmps, c...); return b }
func (b *mockTxnBuilder) Then(o ...clientv3.Op) checkpointTxn { b.ops = append(b.ops, o...); return b }
func (b *mockTxnBuilder) Checkpoint() error { return nil }
func (b *mockTxnBuilder) Flush() error { panic("not supported") }
func (b *mockTxnBuilder) Revision() int64 { panic("not supported") }

var _ = gc.Suite(&ItemStateSuite{})
10 changes: 4 additions & 6 deletions allocator/scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,7 @@ func insert(ctx context.Context, client *clientv3.Client, keyValues ...string) e
return err
}
}
var _, err = txn.Commit()
return err
return txn.Flush()
}

// update updates keys with values, requiring that the key already exist.
Expand All @@ -906,8 +905,7 @@ func update(ctx context.Context, client *clientv3.Client, keyValues ...string) e
return err
}
}
var _, err = txn.Commit()
return err
return txn.Flush()
}

// markAllConsistent which updates all Assignments to have a value of "consistent".
Expand All @@ -928,9 +926,9 @@ func markAllConsistent(ctx context.Context, client *clientv3.Client, ks *keyspac
}
}

if _, err := txn.Commit(); err != nil {
if err := txn.Flush(); err != nil {
return err
} else if txn.noop {
} else if txn.Revision() == 0 {
return io.ErrNoProgress
} else {
return nil
Expand Down

0 comments on commit da22df6

Please sign in to comment.