Skip to content

Commit

Permalink
test: add an upgrade test to verify the replicationstatus is backward…
Browse files Browse the repository at this point in the history
… compatible

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Apr 29, 2022
1 parent ba90f81 commit 7dd3dad
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
11 changes: 11 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -86,6 +88,15 @@ func GetPrimaryPosition(t *testing.T, vttablet Vttablet, hostname string) (strin
return pos, gtID
}

// GetReplicationPosition gets the replication status of given vttablet
func GetReplicationPosition(t *testing.T, vttablet *Vttablet, hostname string) *replicationdatapb.Status {
ctx := context.Background()
vtablet := getTablet(vttablet.GrpcPort, hostname)
pos, err := tmClient.ReplicationStatus(ctx, vtablet)
require.NoError(t, err)
return pos
}

// VerifyRowsInTabletForTable verifies the total number of rows in a table.
// This is used to check that replication has caught up with the changes on primary.
func VerifyRowsInTabletForTable(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int, tableName string) {
Expand Down
41 changes: 41 additions & 0 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,44 @@ func TestReparentDoesntHangIfPrimaryFails(t *testing.T) {
require.Error(t, err)
assert.Contains(t, out, "primary failed to PopulateReparentJournal")
}

func TestReplicationStatus(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Stop the SQL_THREAD on tablets[1]
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE SQL_THREAD;`)
require.NoError(t, err)
// Check the replication status on tablets[1] and assert that the IO thread is read to be running and SQL thread is stopped
replicationStatus := cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread := utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.False(t, sqlThread)

// Stop replication on tablets[1] and verify that both the threads are reported as not running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.False(t, sqlThread)

// Start replication on tablets[1] and verify that both the threads are reported as running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `START SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.True(t, sqlThread)

// Stop IO_THREAD on tablets[1] and verify that the IO thread is read to be stopped and SQL thread is running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.True(t, sqlThread)
}
36 changes: 36 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -744,3 +745,38 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V
require.Equal(t, "No", res.Rows[0][11].ToString())
}
}

// ReplicationThreadsStatus returns the status of the IO and SQL thread. It reads the result of the replication status
// based on the vtctl major version provided. It also uses the vttabletVersion to assert on the expectation of the new fields
// being unknown for the old vttablets and that they match for the new vttablets
func ReplicationThreadsStatus(t *testing.T, status *replicationdatapb.Status, vtctlVersion, vttabletVersion int) (bool, bool) {
if vttabletVersion == 13 {
// If vttablet is version 13, then the new fields should be unknown
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
} else {
// For the new vttablet, the new parameters should not be unknown. Moreover, the old parameters should also be provided
// and should agree with the new ones
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
require.Equal(t, status.IoThreadRunning, mysql.ReplicationState(status.IoState) == mysql.ReplicationStateRunning)
require.Equal(t, status.SqlThreadRunning, mysql.ReplicationState(status.SqlState) == mysql.ReplicationStateRunning)
}

// if vtctlVersion provided is 13, then we should read the old parameters, since that is what old vtctl would do
if vtctlVersion == 13 {
return status.IoThreadRunning, status.SqlThreadRunning
}
// If we are at the latest vtctl version, we should read the latest parameters if provided otherwise the old ones
ioState := mysql.ReplicationState(status.IoState)
ioThread := status.IoThreadRunning
if ioState != mysql.ReplicationStateUnknown {
ioThread = ioState == mysql.ReplicationStateRunning
}
sqlState := mysql.ReplicationState(status.SqlState)
sqlThread := status.SqlThreadRunning
if sqlState != mysql.ReplicationStateUnknown {
sqlThread = sqlState == mysql.ReplicationStateRunning
}
return ioThread, sqlThread
}

0 comments on commit 7dd3dad

Please sign in to comment.