Skip to content

Commit

Permalink
Complete coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed Apr 19, 2020
1 parent 6d03114 commit 7898870
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
6 changes: 5 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package goneli

import (
"fmt"

"github.com/obsidiandynamics/libstdgo/concurrent"
)

// Barrier is a callback function for handling Neli events during group rebalancing.
type Barrier func(e Event)

// NopBarrier returns a no-op barrier implementation.
func NopBarrier() Barrier {
return func(e Event) {}
return func(e Event) {
concurrent.Nop()
}
}

// Event encapsulates a Neli event.
Expand Down
42 changes: 36 additions & 6 deletions neli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func TestErrorDuringSubscribe(t *testing.T) {
}

func TestPulseNotLeader(t *testing.T) {
_, _, config, b := fixtures{}.create()
_, _, config, _ := fixtures{}.create()

n, err := New(config, b.barrier())
n, err := New(config)
require.Nil(t, err)

isLeader, err := n.Pulse(1 * time.Millisecond)
Expand All @@ -159,9 +159,9 @@ func TestPulseNotLeader(t *testing.T) {
}

func TestPulseAfterClose(t *testing.T) {
_, _, config, b := fixtures{}.create()
_, _, config, _ := fixtures{}.create()

n, err := New(config, b.barrier())
n, err := New(config)
require.Nil(t, err)

assertNoError(t, n.Close)
Expand All @@ -173,9 +173,9 @@ func TestPulseAfterClose(t *testing.T) {
}

func TestDeadline(t *testing.T) {
_, _, config, b := fixtures{}.create()
_, _, config, _ := fixtures{}.create()

n, err := New(config, b.barrier())
n, err := New(config)
require.Nil(t, err)

assert.Equal(t, n.Deadline().Last(), time.Unix(0, 0))
Expand Down Expand Up @@ -258,6 +258,36 @@ func TestBasicLeaderElectionAndRevocation(t *testing.T) {
assertNoError(t, p.Await)
}

func TestLeaderElectionAndRevocation_nopBarrier(t *testing.T) {
m, cons, config, _ := fixtures{}.create()

n, err := New(config)
require.Nil(t, err)

onLeaderCnt := concurrent.NewAtomicCounter()
p, err := n.Background(func() {
onLeaderCnt.Inc()
})
require.Nil(t, err)

// Starts off in a non-leader state
assert.Equal(t, false, n.IsLeader())

// Assign leadership via the rebalance listener and wait for the assignment to take effect
cons.rebalanceEvents <- assignedPartitions(0, 1, 2)
wait(t).UntilAsserted(isTrue(n.IsLeader))
wait(t).UntilAsserted(m.ContainsEntries().
Having(scribe.LogLevel(scribe.Info)).
Having(scribe.MessageEqual("Elected as leader")).
Passes(scribe.Count(1)))
m.Reset()
wait(t).UntilAsserted(atLeast(1, onLeaderCnt.GetInt))

assertNoError(t, n.Close)
n.Await()
assertNoError(t, p.Await)
}

func TestNonFatalErrorInReadMessage(t *testing.T) {
m, cons, config, _ := fixtures{}.create()

Expand Down

0 comments on commit 7898870

Please sign in to comment.