Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster tests #351

Merged
merged 1 commit into from
Feb 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 346 additions & 0 deletions test/cluster/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -290,3 +291,348 @@ func TestNodeRecoveryLeader(t *testing.T) {
return c.AgreedHeight(t, 5, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)
}

//Scenario:
// round 1:
// - start 3 servers in a cluster.
// - wait for one to be the leader.
// - submit a tx.
// - stop the leader.
// round 2:
// - find leader2.
// - shut down the follower.
// - submit a tx to the remaining server.
// - there is no majority to pick leader => tx fails.
// - restart one server so now there will be a majority to pick a leader.
// round 3:
// - find leader3.
// - submit a tx => tx accepted.
// - restart the third server => all 3 nodes are active.
// round 4:
// - find leader4.
// - submit a tx => tx accepted.
func TestNoMajorityToChooseLeader(t *testing.T) {
dir, err := ioutil.TempDir("", "int-test")
require.NoError(t, err)

nPort, pPort := getPorts(3)
setupConfig := &setup.Config{
NumberOfServers: 3,
TestDirAbsolutePath: dir,
BDBBinaryPath: "../../bin/bdb",
CmdTimeout: 10 * time.Second,
BaseNodePort: nPort,
BasePeerPort: pPort,
CheckRedirectFunc: func(req *http.Request, via []*http.Request) error {
return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via))
},
}
c, err := setup.NewCluster(setupConfig)
require.NoError(t, err)
defer c.ShutdownAndCleanup()

require.NoError(t, c.Start())

leaderRound1 := -1
require.Eventually(t, func() bool {
leaderRound1 = c.AgreedLeader(t, 0, 1, 2)
return leaderRound1 >= 0
}, 30*time.Second, 100*time.Millisecond)

txID, rcpt, err := c.Servers[leaderRound1].WriteDataTx(t, worldstate.DefaultDBName, "alice", []byte("alice-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)

var dataEnv *types.GetDataResponseEnvelope
require.Eventually(t, func() bool {
dataEnv, err = c.Servers[leaderRound1].QueryData(t, worldstate.DefaultDBName, "alice")
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)
dataResp := dataEnv.GetResponse().GetValue()
require.Equal(t, dataResp, []byte("alice-data"))
t.Logf("data: %+v", string(dataResp))

require.Eventually(t, func() bool {
return c.AgreedHeight(t, 2, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)

follower1 := (leaderRound1 + 1) % 3
follower2 := (leaderRound1 + 2) % 3

require.NoError(t, c.ShutdownServer(c.Servers[leaderRound1]))

//find new leader and make sure there are only 2 active nodes
leaderRound2 := -1
require.Eventually(t, func() bool {
leaderRound2 = c.AgreedLeader(t, follower1, follower2)
return leaderRound2 >= 0
}, 30*time.Second, 100*time.Millisecond)

t.Logf("Stopped node %d, new leader index is: %d; 2-node quorum", leaderRound1, leaderRound2)

follower1Round2 := follower1
follower2Round2 := leaderRound1
if leaderRound2 == follower1 {
follower1Round2 = follower2
}

//servers alive: leaderRound2, follower1Round2
require.Eventually(t, func() bool {
clusterStatusResEnv, err := c.Servers[leaderRound2].QueryClusterStatus(t)
return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 2
}, 30*time.Second, 100*time.Millisecond)

require.NoError(t, c.ShutdownServer(c.Servers[follower1Round2]))

//only one server is active now => no majority to pick leader
require.Eventually(t, func() bool {
_, _, err = c.Servers[leaderRound2].WriteDataTx(t, worldstate.DefaultDBName, "bob", []byte("bob-data"))
return err != nil && err.Error() == "failed to submit transaction, server returned: status: 503 Service Unavailable, message: Cluster leader unavailable"
}, 60*time.Second, 100*time.Millisecond)

//restart one server => 2 servers are active: leaderRound1, leaderRound2
require.NoError(t, c.StartServer(c.Servers[follower1Round2]))

//find the new leader
leaderRound3 := -1
require.Eventually(t, func() bool {
leaderRound3 = c.AgreedLeader(t, leaderRound2, follower1Round2)
return leaderRound3 >= 0
}, 30*time.Second, 100*time.Millisecond)

t.Logf("Started node %d, leader index is: %d; 2-node quorum", follower1Round2, leaderRound3)

follower1Round3 := leaderRound2
if leaderRound3 == leaderRound2 {
follower1Round3 = follower1Round2
}

txID, rcpt, err = c.Servers[leaderRound3].WriteDataTx(t, worldstate.DefaultDBName, "charlie", []byte("charlie-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)

require.Eventually(t, func() bool {
dataEnv, err = c.Servers[leaderRound3].QueryData(t, worldstate.DefaultDBName, "charlie")
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)

dataResp = dataEnv.GetResponse().GetValue()
require.Equal(t, dataResp, []byte("charlie-data"))
t.Logf("data: %+v", string(dataResp))

require.Eventually(t, func() bool {
return c.AgreedHeight(t, rcpt.Header.BaseHeader.Number, leaderRound3, follower1Round3)
}, 30*time.Second, 100*time.Millisecond)

//restart the third node => all 3 nodes are active
require.NoError(t, c.StartServer(c.Servers[follower2Round2]))

//find the new leader
leaderRound4 := -1
require.Eventually(t, func() bool {
leaderRound4 = c.AgreedLeader(t, 0, 1, 2)
return leaderRound4 >= 0
}, 30*time.Second, 100*time.Millisecond)

t.Logf("Started node %d, leader index is: %d; all 3 nodes are up", follower2Round2, leaderRound4)

txID, rcpt, err = c.Servers[leaderRound4].WriteDataTx(t, worldstate.DefaultDBName, "dan", []byte("dan-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)

require.Eventually(t, func() bool {
dataEnv, err = c.Servers[leaderRound4].QueryData(t, worldstate.DefaultDBName, "dan")
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)

dataResp = dataEnv.GetResponse().GetValue()
require.Equal(t, dataResp, []byte("dan-data"))
t.Logf("data: %+v", string(dataResp))

require.Eventually(t, func() bool {
return c.AgreedHeight(t, rcpt.Header.BaseHeader.Number, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)
}

// Scenario:
// - start 3 servers in a cluster.
// - wait for one to be the leader.
// - submit 2 txs.
// - shutting down and starting all servers in the cluster.
// - make sure the servers are in sync with the txs made before they were shut down.
func TestClusterRestart(t *testing.T) {
dir, err := ioutil.TempDir("", "int-test")
require.NoError(t, err)

nPort, pPort := getPorts(3)
setupConfig := &setup.Config{
NumberOfServers: 3,
TestDirAbsolutePath: dir,
BDBBinaryPath: "../../bin/bdb",
CmdTimeout: 10 * time.Second,
BaseNodePort: nPort,
BasePeerPort: pPort,
CheckRedirectFunc: func(req *http.Request, via []*http.Request) error {
return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via))
},
}
c, err := setup.NewCluster(setupConfig)
require.NoError(t, err)
defer c.ShutdownAndCleanup()

require.NoError(t, c.Start())

//find the leader
leaderIndex := -1
require.Eventually(t, func() bool {
leaderIndex = c.AgreedLeader(t, 0, 1, 2)
return leaderIndex >= 0
}, 30*time.Second, 100*time.Millisecond)

keys := []string{"alice", "bob"}
for _, key := range keys {
txID, rcpt, err := c.Servers[leaderIndex].WriteDataTx(t, worldstate.DefaultDBName, key, []byte(key+"-data"))
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)
}

require.NoError(t, c.Restart())

//find the new leader
newLeader := -1
require.Eventually(t, func() bool {
newLeader = c.AgreedLeader(t, 0, 1, 2)
return newLeader >= 0
}, 30*time.Second, 100*time.Millisecond)

//make sure all 3 nodes are active
require.Eventually(t, func() bool {
clusterStatusResEnv, err := c.Servers[newLeader].QueryClusterStatus(t)
return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 3
}, 30*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
return c.AgreedHeight(t, 3, 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)

for _, key := range keys {
require.Eventually(t, func() bool {
dataEnv, err := c.Servers[newLeader].QueryData(t, worldstate.DefaultDBName, key)
if dataEnv != nil && dataEnv.GetResponse().GetValue() != nil {
dataVal := dataEnv.GetResponse().GetValue()
require.Equal(t, dataVal, []byte(key+"-data"))
t.Logf("data: %+v", dataVal)
}
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)
}
}

// Scenario:
// - start 3 servers in a cluster.
// - wait for one to be the leader.
// - stop the leader.
// - wait for new server to be the leader.
// - the leader submit a tx.
// - repeat until each server was a leader at least once.
// - make sure each node submitted a valid tx as a leader.
func TestAllNodesGetLeadership(t *testing.T) {
dir, err := ioutil.TempDir("", "int-test")
require.NoError(t, err)

nPort, pPort := getPorts(3)
setupConfig := &setup.Config{
NumberOfServers: 3,
TestDirAbsolutePath: dir,
BDBBinaryPath: "../../bin/bdb",
CmdTimeout: 10 * time.Second,
BaseNodePort: nPort,
BasePeerPort: pPort,
CheckRedirectFunc: func(req *http.Request, via []*http.Request) error {
return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via))
},
}
c, err := setup.NewCluster(setupConfig)
require.NoError(t, err)
defer c.ShutdownAndCleanup()

require.NoError(t, c.Start())

leaders := [3]bool{false, false, false}

currentLeader := -1
require.Eventually(t, func() bool {
currentLeader = c.AgreedLeader(t, 0, 1, 2)
return currentLeader >= 0
}, 30*time.Second, 100*time.Millisecond)

txsCount := 0
oldLeader := -1
for !leaders[0] || !leaders[1] || !leaders[2] {
leaders[currentLeader] = true
txID, rcpt, err := c.Servers[currentLeader].WriteDataTx(t, worldstate.DefaultDBName, strconv.Itoa(currentLeader), []byte{uint8(currentLeader)})
require.NoError(t, err)
require.NotNil(t, rcpt)
require.True(t, txID != "")
require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0)
require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag)
t.Logf("tx submitted: %s, %+v", txID, rcpt)
txsCount++

require.NoError(t, c.ShutdownServer(c.Servers[currentLeader]))

if oldLeader != -1 {
require.NoError(t, c.StartServer(c.Servers[oldLeader]))
}
oldLeader = currentLeader
require.Eventually(t, func() bool {
currentLeader = c.AgreedLeader(t, (oldLeader+1)%3, (oldLeader+2)%3)
return currentLeader >= 0
}, 30*time.Second, 100*time.Millisecond)
}

require.NoError(t, c.StartServer(c.Servers[oldLeader]))
require.Eventually(t, func() bool {
currentLeader = c.AgreedLeader(t, 0, 1, 2)
return currentLeader >= 0
}, 30*time.Second, 100*time.Millisecond)

//make sure all 3 nodes are active
require.Eventually(t, func() bool {
clusterStatusResEnv, err := c.Servers[currentLeader].QueryClusterStatus(t)
return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 3
}, 30*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
return c.AgreedHeight(t, uint64(txsCount+1), 0, 1, 2)
}, 30*time.Second, 100*time.Millisecond)

for i := 0; i < 3; i++ {
require.Eventually(t, func() bool {
dataEnv, err := c.Servers[currentLeader].QueryData(t, worldstate.DefaultDBName, strconv.Itoa(i))
if dataEnv != nil && dataEnv.GetResponse().GetValue() != nil {
dataVal := dataEnv.GetResponse().GetValue()
require.Equal(t, []byte{uint8(i)}, dataVal)
t.Logf("data: %+v", dataVal)
}
return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil
}, 30*time.Second, 100*time.Millisecond)

}
}