diff --git a/test/cluster/basic_test.go b/test/cluster/basic_test.go index 4166a41f..055e9aac 100644 --- a/test/cluster/basic_test.go +++ b/test/cluster/basic_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "sync" "testing" "time" @@ -290,3 +291,348 @@ func TestNodeRecoveryLeader(t *testing.T) { return c.AgreedHeight(t, 5, 0, 1, 2) }, 30*time.Second, 100*time.Millisecond) } + +//Scenario: +// round 1: +// - start 3 servers in a cluster. +// - wait for one to be the leader. +// - submit a tx. +// - stop the leader. +// round 2: +// - find leader2. +// - shut down the follower. +// - submit a tx to the remaining server. +// - there is no majority to pick leader => tx fails. +// - restart one server so now there will be a majority to pick a leader. +// round 3: +// - find leader3. +// - submit a tx => tx accepted. +// - restart the third server => all 3 nodes are active. +// round 4: +// - find leader4. +// - submit a tx => tx accepted. +func TestNoMajorityToChooseLeader(t *testing.T) { + dir, err := ioutil.TempDir("", "int-test") + require.NoError(t, err) + + nPort, pPort := getPorts(3) + setupConfig := &setup.Config{ + NumberOfServers: 3, + TestDirAbsolutePath: dir, + BDBBinaryPath: "../../bin/bdb", + CmdTimeout: 10 * time.Second, + BaseNodePort: nPort, + BasePeerPort: pPort, + CheckRedirectFunc: func(req *http.Request, via []*http.Request) error { + return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via)) + }, + } + c, err := setup.NewCluster(setupConfig) + require.NoError(t, err) + defer c.ShutdownAndCleanup() + + require.NoError(t, c.Start()) + + leaderRound1 := -1 + require.Eventually(t, func() bool { + leaderRound1 = c.AgreedLeader(t, 0, 1, 2) + return leaderRound1 >= 0 + }, 30*time.Second, 100*time.Millisecond) + + txID, rcpt, err := c.Servers[leaderRound1].WriteDataTx(t, worldstate.DefaultDBName, "alice", []byte("alice-data")) + require.NoError(t, err) + require.NotNil(t, rcpt) + require.True(t, txID != "") + require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0) + require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag) + t.Logf("tx submitted: %s, %+v", txID, rcpt) + + var dataEnv *types.GetDataResponseEnvelope + require.Eventually(t, func() bool { + dataEnv, err = c.Servers[leaderRound1].QueryData(t, worldstate.DefaultDBName, "alice") + return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil + }, 30*time.Second, 100*time.Millisecond) + dataResp := dataEnv.GetResponse().GetValue() + require.Equal(t, dataResp, []byte("alice-data")) + t.Logf("data: %+v", string(dataResp)) + + require.Eventually(t, func() bool { + return c.AgreedHeight(t, 2, 0, 1, 2) + }, 30*time.Second, 100*time.Millisecond) + + follower1 := (leaderRound1 + 1) % 3 + follower2 := (leaderRound1 + 2) % 3 + + require.NoError(t, c.ShutdownServer(c.Servers[leaderRound1])) + + //find new leader and make sure there are only 2 active nodes + leaderRound2 := -1 + require.Eventually(t, func() bool { + leaderRound2 = c.AgreedLeader(t, follower1, follower2) + return leaderRound2 >= 0 + }, 30*time.Second, 100*time.Millisecond) + + t.Logf("Stopped node %d, new leader index is: %d; 2-node quorum", leaderRound1, leaderRound2) + + follower1Round2 := follower1 + follower2Round2 := leaderRound1 + if leaderRound2 == follower1 { + follower1Round2 = follower2 + } + + //servers alive: leaderRound2, follower1Round2 + require.Eventually(t, func() bool { + clusterStatusResEnv, err := c.Servers[leaderRound2].QueryClusterStatus(t) + return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 2 + }, 30*time.Second, 100*time.Millisecond) + + require.NoError(t, c.ShutdownServer(c.Servers[follower1Round2])) + + //only one server is active now => no majority to pick leader + require.Eventually(t, func() bool { + _, _, err = c.Servers[leaderRound2].WriteDataTx(t, worldstate.DefaultDBName, "bob", []byte("bob-data")) + return err != nil && err.Error() == "failed to submit transaction, server returned: status: 503 Service Unavailable, message: Cluster leader unavailable" + }, 60*time.Second, 100*time.Millisecond) + + //restart one server => 2 servers are active: leaderRound1, leaderRound2 + require.NoError(t, c.StartServer(c.Servers[follower1Round2])) + + //find the new leader + leaderRound3 := -1 + require.Eventually(t, func() bool { + leaderRound3 = c.AgreedLeader(t, leaderRound2, follower1Round2) + return leaderRound3 >= 0 + }, 30*time.Second, 100*time.Millisecond) + + t.Logf("Started node %d, leader index is: %d; 2-node quorum", follower1Round2, leaderRound3) + + follower1Round3 := leaderRound2 + if leaderRound3 == leaderRound2 { + follower1Round3 = follower1Round2 + } + + txID, rcpt, err = c.Servers[leaderRound3].WriteDataTx(t, worldstate.DefaultDBName, "charlie", []byte("charlie-data")) + require.NoError(t, err) + require.NotNil(t, rcpt) + require.True(t, txID != "") + require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0) + require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag) + t.Logf("tx submitted: %s, %+v", txID, rcpt) + + require.Eventually(t, func() bool { + dataEnv, err = c.Servers[leaderRound3].QueryData(t, worldstate.DefaultDBName, "charlie") + return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil + }, 30*time.Second, 100*time.Millisecond) + + dataResp = dataEnv.GetResponse().GetValue() + require.Equal(t, dataResp, []byte("charlie-data")) + t.Logf("data: %+v", string(dataResp)) + + require.Eventually(t, func() bool { + return c.AgreedHeight(t, rcpt.Header.BaseHeader.Number, leaderRound3, follower1Round3) + }, 30*time.Second, 100*time.Millisecond) + + //restart the third node => all 3 nodes are active + require.NoError(t, c.StartServer(c.Servers[follower2Round2])) + + //find the new leader + leaderRound4 := -1 + require.Eventually(t, func() bool { + leaderRound4 = c.AgreedLeader(t, 0, 1, 2) + return leaderRound4 >= 0 + }, 30*time.Second, 100*time.Millisecond) + + t.Logf("Started node %d, leader index is: %d; all 3 nodes are up", follower2Round2, leaderRound4) + + txID, rcpt, err = c.Servers[leaderRound4].WriteDataTx(t, worldstate.DefaultDBName, "dan", []byte("dan-data")) + require.NoError(t, err) + require.NotNil(t, rcpt) + require.True(t, txID != "") + require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0) + require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag) + t.Logf("tx submitted: %s, %+v", txID, rcpt) + + require.Eventually(t, func() bool { + dataEnv, err = c.Servers[leaderRound4].QueryData(t, worldstate.DefaultDBName, "dan") + return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil + }, 30*time.Second, 100*time.Millisecond) + + dataResp = dataEnv.GetResponse().GetValue() + require.Equal(t, dataResp, []byte("dan-data")) + t.Logf("data: %+v", string(dataResp)) + + require.Eventually(t, func() bool { + return c.AgreedHeight(t, rcpt.Header.BaseHeader.Number, 0, 1, 2) + }, 30*time.Second, 100*time.Millisecond) +} + +// Scenario: +// - start 3 servers in a cluster. +// - wait for one to be the leader. +// - submit 2 txs. +// - shutting down and starting all servers in the cluster. +// - make sure the servers are in sync with the txs made before they were shut down. +func TestClusterRestart(t *testing.T) { + dir, err := ioutil.TempDir("", "int-test") + require.NoError(t, err) + + nPort, pPort := getPorts(3) + setupConfig := &setup.Config{ + NumberOfServers: 3, + TestDirAbsolutePath: dir, + BDBBinaryPath: "../../bin/bdb", + CmdTimeout: 10 * time.Second, + BaseNodePort: nPort, + BasePeerPort: pPort, + CheckRedirectFunc: func(req *http.Request, via []*http.Request) error { + return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via)) + }, + } + c, err := setup.NewCluster(setupConfig) + require.NoError(t, err) + defer c.ShutdownAndCleanup() + + require.NoError(t, c.Start()) + + //find the leader + leaderIndex := -1 + require.Eventually(t, func() bool { + leaderIndex = c.AgreedLeader(t, 0, 1, 2) + return leaderIndex >= 0 + }, 30*time.Second, 100*time.Millisecond) + + keys := []string{"alice", "bob"} + for _, key := range keys { + txID, rcpt, err := c.Servers[leaderIndex].WriteDataTx(t, worldstate.DefaultDBName, key, []byte(key+"-data")) + require.NoError(t, err) + require.NotNil(t, rcpt) + require.True(t, txID != "") + require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0) + require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag) + t.Logf("tx submitted: %s, %+v", txID, rcpt) + } + + require.NoError(t, c.Restart()) + + //find the new leader + newLeader := -1 + require.Eventually(t, func() bool { + newLeader = c.AgreedLeader(t, 0, 1, 2) + return newLeader >= 0 + }, 30*time.Second, 100*time.Millisecond) + + //make sure all 3 nodes are active + require.Eventually(t, func() bool { + clusterStatusResEnv, err := c.Servers[newLeader].QueryClusterStatus(t) + return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 3 + }, 30*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return c.AgreedHeight(t, 3, 0, 1, 2) + }, 30*time.Second, 100*time.Millisecond) + + for _, key := range keys { + require.Eventually(t, func() bool { + dataEnv, err := c.Servers[newLeader].QueryData(t, worldstate.DefaultDBName, key) + if dataEnv != nil && dataEnv.GetResponse().GetValue() != nil { + dataVal := dataEnv.GetResponse().GetValue() + require.Equal(t, dataVal, []byte(key+"-data")) + t.Logf("data: %+v", dataVal) + } + return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil + }, 30*time.Second, 100*time.Millisecond) + } +} + +// Scenario: +// - start 3 servers in a cluster. +// - wait for one to be the leader. +// - stop the leader. +// - wait for new server to be the leader. +// - the leader submit a tx. +// - repeat until each server was a leader at least once. +// - make sure each node submitted a valid tx as a leader. +func TestAllNodesGetLeadership(t *testing.T) { + dir, err := ioutil.TempDir("", "int-test") + require.NoError(t, err) + + nPort, pPort := getPorts(3) + setupConfig := &setup.Config{ + NumberOfServers: 3, + TestDirAbsolutePath: dir, + BDBBinaryPath: "../../bin/bdb", + CmdTimeout: 10 * time.Second, + BaseNodePort: nPort, + BasePeerPort: pPort, + CheckRedirectFunc: func(req *http.Request, via []*http.Request) error { + return errors.Errorf("Redirect blocked in test client: url: '%s', referrer: '%s', #via: %d", req.URL, req.Referer(), len(via)) + }, + } + c, err := setup.NewCluster(setupConfig) + require.NoError(t, err) + defer c.ShutdownAndCleanup() + + require.NoError(t, c.Start()) + + leaders := [3]bool{false, false, false} + + currentLeader := -1 + require.Eventually(t, func() bool { + currentLeader = c.AgreedLeader(t, 0, 1, 2) + return currentLeader >= 0 + }, 30*time.Second, 100*time.Millisecond) + + txsCount := 0 + oldLeader := -1 + for !leaders[0] || !leaders[1] || !leaders[2] { + leaders[currentLeader] = true + txID, rcpt, err := c.Servers[currentLeader].WriteDataTx(t, worldstate.DefaultDBName, strconv.Itoa(currentLeader), []byte{uint8(currentLeader)}) + require.NoError(t, err) + require.NotNil(t, rcpt) + require.True(t, txID != "") + require.True(t, len(rcpt.GetHeader().GetValidationInfo()) > 0) + require.Equal(t, types.Flag_VALID, rcpt.Header.ValidationInfo[rcpt.TxIndex].Flag) + t.Logf("tx submitted: %s, %+v", txID, rcpt) + txsCount++ + + require.NoError(t, c.ShutdownServer(c.Servers[currentLeader])) + + if oldLeader != -1 { + require.NoError(t, c.StartServer(c.Servers[oldLeader])) + } + oldLeader = currentLeader + require.Eventually(t, func() bool { + currentLeader = c.AgreedLeader(t, (oldLeader+1)%3, (oldLeader+2)%3) + return currentLeader >= 0 + }, 30*time.Second, 100*time.Millisecond) + } + + require.NoError(t, c.StartServer(c.Servers[oldLeader])) + require.Eventually(t, func() bool { + currentLeader = c.AgreedLeader(t, 0, 1, 2) + return currentLeader >= 0 + }, 30*time.Second, 100*time.Millisecond) + + //make sure all 3 nodes are active + require.Eventually(t, func() bool { + clusterStatusResEnv, err := c.Servers[currentLeader].QueryClusterStatus(t) + return err == nil && clusterStatusResEnv != nil && len(clusterStatusResEnv.GetResponse().GetActive()) == 3 + }, 30*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return c.AgreedHeight(t, uint64(txsCount+1), 0, 1, 2) + }, 30*time.Second, 100*time.Millisecond) + + for i := 0; i < 3; i++ { + require.Eventually(t, func() bool { + dataEnv, err := c.Servers[currentLeader].QueryData(t, worldstate.DefaultDBName, strconv.Itoa(i)) + if dataEnv != nil && dataEnv.GetResponse().GetValue() != nil { + dataVal := dataEnv.GetResponse().GetValue() + require.Equal(t, []byte{uint8(i)}, dataVal) + t.Logf("data: %+v", dataVal) + } + return dataEnv != nil && dataEnv.GetResponse().GetValue() != nil && err == nil + }, 30*time.Second, 100*time.Millisecond) + + } +}