Skip to content

Commit

Permalink
Adding cluster recovery tests
Browse files Browse the repository at this point in the history
Signed-off-by: tohar-ayash <toharayash1@gmail.com>
  • Loading branch information
tohar-ayash committed Feb 21, 2022
1 parent 26ee576 commit 45706b4
Showing 1 changed file with 346 additions and 0 deletions.
346 changes: 346 additions & 0 deletions test/cluster/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -290,3 +291,348 @@ func TestNodeRecoveryLeader(t *testing.T) {
return c.AgreedHeight(t, 5, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)
}

//Scenario:
// round 1:
// - start 3 servers in a cluster.
// - wait for one to be the leader.
// - submit a tx.
// - stop the leader.
// round 2:
// - find leader2.
// - shut down the follower.
// - submit a tx to the remaining server.
// - there is no majority to pick leader => tx fails.
// - restart one server so now there will be a majority to pick a leader.
// round 3:
// - find leader3.
// - submit a tx => tx accepted.
// - restart the third server => all 3 nodes are active.
// round 4:
// - find leader4.
// - submit a tx => tx accepted.
func TestNoMajorityToChooseLeader(t *testing.T) {
dir, err := ioutil.TempDir("", "int-test")
require.NoError(t, err)

nPort, pPort := getPorts(3)
setupConfig := &setup.Config{
NumberOfServers: 3,
TestDirAbsolutePath: dir,
BDBBinaryPath: "../../bin/bdb",
CmdTimeout: 10 * time.Second,
BaseNodePort: nPort,
BasePeerPort: pPort,
CheckRedirectFunc: func(req *http.Request, via []*http.Request) error {
return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via))
},
}
c, err := setup.NewCluster(setupConfig)
require.NoError(t, err)
defer c.ShutdownAndCleanup()

require.NoError(t, c.Start())

leaderRound1 := -1
require.Eventually(t, func() bool {
leaderRound1 = c.AgreedLeader(t, 0, 1, 2)
return leaderRound1 >= 0
}, 30*time.Second, 100*time.Millisecond)

txID, rcpt, err := c.Servers[leaderRound1].WriteDataTx(t, worldstate.DefaultDBName, "alice", []byte("alice-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)

var dataEnv *types.GetDataResponseEnvelope
require.Eventually(t, func() bool {
dataEnv, err = c.Servers[leaderRound1].QueryData(t, worldstate.DefaultDBName, "alice")
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)
dataResp := dataEnv.GetResponse().GetValue()
require.Equal(t, dataResp, []byte("alice-data"))
t.Logf("data: %+v", string(dataResp))

require.Eventually(t, func() bool {
return c.AgreedHeight(t, 2, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)

follower1 := (leaderRound1 + 1) % 3
follower2 := (leaderRound1 + 2) % 3

require.NoError(t, c.ShutdownServer(c.Servers[leaderRound1]))

//find new leader and make sure there are only 2 active nodes
leaderRound2 := -1
require.Eventually(t, func() bool {
leaderRound2 = c.AgreedLeader(t, follower1, follower2)
return leaderRound2 >= 0
}, 30*time.Second, 100*time.Millisecond)

t.Logf("Stopped node %d, new leader index is: %d; 2-node quorum", leaderRound1, leaderRound2)

follower1Round2 := follower1
follower2Round2 := leaderRound1
if leaderRound2 == follower1 {
follower1Round2 = follower2
}

//servers alive: leaderRound2, follower1Round2
require.Eventually(t, func() bool {
clusterStatusResEnv, err := c.Servers[leaderRound2].QueryClusterStatus(t)
return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 2
}, 30*time.Second, 100*time.Millisecond)

require.NoError(t, c.ShutdownServer(c.Servers[follower1Round2]))

//only one server is active now => no majority to pick leader
require.Eventually(t, func() bool {
_, _, err = c.Servers[leaderRound2].WriteDataTx(t, worldstate.DefaultDBName, "bob", []byte("bob-data"))
return err != nil && err.Error() == "failed to submit transaction, server returned: status: 503 Service Unavailable, message: Cluster leader unavailable"
}, 60*time.Second, 100*time.Millisecond)

//restart one server => 2 servers are active: leaderRound1, leaderRound2
require.NoError(t, c.StartServer(c.Servers[follower1Round2]))

//find the new leader
leaderRound3 := -1
require.Eventually(t, func() bool {
leaderRound3 = c.AgreedLeader(t, leaderRound2, follower1Round2)
return leaderRound3 >= 0
}, 30*time.Second, 100*time.Millisecond)

t.Logf("Started node %d, leader index is: %d; 2-node quorum", follower1Round2, leaderRound3)

follower1Round3 := leaderRound2
if leaderRound3 == leaderRound2 {
follower1Round3 = follower1Round2
}

txID, rcpt, err = c.Servers[leaderRound3].WriteDataTx(t, worldstate.DefaultDBName, "charlie", []byte("charlie-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)

require.Eventually(t, func() bool {
dataEnv, err = c.Servers[leaderRound3].QueryData(t, worldstate.DefaultDBName, "charlie")
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)

dataResp = dataEnv.GetResponse().GetValue()
require.Equal(t, dataResp, []byte("charlie-data"))
t.Logf("data: %+v", string(dataResp))

require.Eventually(t, func() bool {
return c.AgreedHeight(t, rcpt.Header.BaseHeader.Number, leaderRound3, follower1Round3)
}, 30*time.Second, 100*time.Millisecond)

//restart the third node => all 3 nodes are active
require.NoError(t, c.StartServer(c.Servers[follower2Round2]))

//find the new leader
leaderRound4 := -1
require.Eventually(t, func() bool {
leaderRound4 = c.AgreedLeader(t, 0, 1, 2)
return leaderRound4 >= 0
}, 30*time.Second, 100*time.Millisecond)

t.Logf("Started node %d, leader index is: %d; all 3 nodes are up", follower2Round2, leaderRound4)

txID, rcpt, err = c.Servers[leaderRound4].WriteDataTx(t, worldstate.DefaultDBName, "dan", []byte("dan-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)

require.Eventually(t, func() bool {
dataEnv, err = c.Servers[leaderRound4].QueryData(t, worldstate.DefaultDBName, "dan")
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)

dataResp = dataEnv.GetResponse().GetValue()
require.Equal(t, dataResp, []byte("dan-data"))
t.Logf("data: %+v", string(dataResp))

require.Eventually(t, func() bool {
return c.AgreedHeight(t, rcpt.Header.BaseHeader.Number, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)
}

// Scenario:
// - start 3 servers in a cluster.
// - wait for one to be the leader.
// - submit 2 txs.
// - shutting down and starting all servers in the cluster.
// - make sure the servers are in sync with the txs made before they were shut down.
func TestClusterRestart(t *testing.T) {
dir, err := ioutil.TempDir("", "int-test")
require.NoError(t, err)

nPort, pPort := getPorts(3)
setupConfig := &setup.Config{
NumberOfServers: 3,
TestDirAbsolutePath: dir,
BDBBinaryPath: "../../bin/bdb",
CmdTimeout: 10 * time.Second,
BaseNodePort: nPort,
BasePeerPort: pPort,
CheckRedirectFunc: func(req *http.Request, via []*http.Request) error {
return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via))
},
}
c, err := setup.NewCluster(setupConfig)
require.NoError(t, err)
defer c.ShutdownAndCleanup()

require.NoError(t, c.Start())

//find the leader
leaderIndex := -1
require.Eventually(t, func() bool {
leaderIndex = c.AgreedLeader(t, 0, 1, 2)
return leaderIndex >= 0
}, 30*time.Second, 100*time.Millisecond)

keys := []string{"alice", "bob"}
for _, key := range keys {
txID, rcpt, err := c.Servers[leaderIndex].WriteDataTx(t, worldstate.DefaultDBName, key, []byte(key+"-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)
}

require.NoError(t, c.Restart())

//find the new leader
newLeader := -1
require.Eventually(t, func() bool {
newLeader = c.AgreedLeader(t, 0, 1, 2)
return newLeader >= 0
}, 30*time.Second, 100*time.Millisecond)

//make sure all 3 nodes are active
require.Eventually(t, func() bool {
clusterStatusResEnv, err := c.Servers[newLeader].QueryClusterStatus(t)
return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 3
}, 30*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
return c.AgreedHeight(t, 3, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)

for _, key := range keys {
require.Eventually(t, func() bool {
dataEnv, err := c.Servers[newLeader].QueryData(t, worldstate.DefaultDBName, key)
if dataEnv != nil && dataEnv.GetResponse().GetValue() != nil {
dataVal := dataEnv.GetResponse().GetValue()
require.Equal(t, dataVal, []byte(key+"-data"))
t.Logf("data: %+v", dataVal)
}
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)
}
}

// Scenario:
// - start 3 servers in a cluster.
// - wait for one to be the leader.
// - stop the leader.
// - wait for new server to be the leader.
// - the leader submit a tx.
// - repeat until each server was a leader at least once.
// - make sure each node submitted a valid tx as a leader.
func TestAllNodesGetLeadership(t *testing.T) {
dir, err := ioutil.TempDir("", "int-test")
require.NoError(t, err)

nPort, pPort := getPorts(3)
setupConfig := &setup.Config{
NumberOfServers: 3,
TestDirAbsolutePath: dir,
BDBBinaryPath: "../../bin/bdb",
CmdTimeout: 10 * time.Second,
BaseNodePort: nPort,
BasePeerPort: pPort,
CheckRedirectFunc: func(req *http.Request, via []*http.Request) error {
return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via))
},
}
c, err := setup.NewCluster(setupConfig)
require.NoError(t, err)
defer c.ShutdownAndCleanup()

require.NoError(t, c.Start())

leaders := [3]bool{false, false, false}

currentLeader := -1
require.Eventually(t, func() bool {
currentLeader = c.AgreedLeader(t, 0, 1, 2)
return currentLeader >= 0
}, 30*time.Second, 100*time.Millisecond)

txsCount := 0
oldLeader := -1
for !leaders[0] || !leaders[1] || !leaders[2] {
leaders[currentLeader] = true
txID, rcpt, err := c.Servers[currentLeader].WriteDataTx(t, worldstate.DefaultDBName, strconv.Itoa(currentLeader), []byte{uint8(currentLeader)})
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)
txsCount++

require.NoError(t, c.ShutdownServer(c.Servers[currentLeader]))

if oldLeader != -1 {
require.NoError(t, c.StartServer(c.Servers[oldLeader]))
}
oldLeader = currentLeader
require.Eventually(t, func() bool {
currentLeader = c.AgreedLeader(t, (oldLeader+1)%3, (oldLeader+2)%3)
return currentLeader >= 0
}, 30*time.Second, 100*time.Millisecond)
}

require.NoError(t, c.StartServer(c.Servers[oldLeader]))
require.Eventually(t, func() bool {
currentLeader = c.AgreedLeader(t, 0, 1, 2)
return currentLeader >= 0
}, 30*time.Second, 100*time.Millisecond)

//make sure all 3 nodes are active
require.Eventually(t, func() bool {
clusterStatusResEnv, err := c.Servers[currentLeader].QueryClusterStatus(t)
return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 3
}, 30*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
return c.AgreedHeight(t, uint64(txsCount+1), 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)

for i := 0; i < 3; i++ {
require.Eventually(t, func() bool {
dataEnv, err := c.Servers[currentLeader].QueryData(t, worldstate.DefaultDBName, strconv.Itoa(i))
if dataEnv != nil && dataEnv.GetResponse().GetValue() != nil {
dataVal := dataEnv.GetResponse().GetValue()
require.Equal(t, []byte{uint8(i)}, dataVal)
t.Logf("data: %+v", dataVal)
}
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)

}
}

0 comments on commit 45706b4

Please sign in to comment.