Skip to content

Commit

Permalink
Merge branch 'repl_state_transition' into test-together
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Apr 29, 2022
2 parents 57dd216 + cdca70e commit 96a0352
Show file tree
Hide file tree
Showing 73 changed files with 9,169 additions and 6,711 deletions.
48 changes: 48 additions & 0 deletions .github/workflows/comment_pull_request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: comment_pull_request
on:
pull_request_target:
types: [opened, reopened, ready_for_review]

permissions:
contents: write
pull-requests: write

jobs:
review_checklist:
if: ${{ !github.event.pull_request.draft }}
runs-on: ubuntu-latest
name: Comment Pull Request with the Review Checklist
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Comment PR
uses: thollander/actions-comment-pull-request@v1
with:
comment_includes: 'Review Checklist'
message: |
### Review Checklist
Hello reviewers! :wave: Please follow this checklist when reviewing this Pull Request.
#### General
- [ ] Ensure that the Pull Request has the correct `release notes` label. `release notes none` should only be used for PRs that are so trivial that they need not be included.
- [ ] If a new flag is being introduced, review whether it is really needed. The flag names should be clear and intuitive (as far as possible), and the flag's help should be descriptive.
#### Bug fixes
- [ ] There should be at least one unit or end-to-end test.
- [ ] The Pull Request description should either include a link to an issue that describes the bug OR an actual description of the bug and how to reproduce, along with a description of the fix.

#### Non-trivial changes
- [ ] There should be some code comments as to why things are implemented the way they are.

#### New/Existing features
- [ ] Should be documented, either by modifying the existing documentation or creating new documentation.
- [ ] New features should have a link to a feature request issue or an RFC that documents the use cases, corner cases and test cases.

#### Backward compatibility
- [ ] Protobuf changes should be wire-compatible.
- [ ] Changes to `_vt` tables and RPCs need to be backward compatible.
- [ ] `vtctl` command output order should be stable and `awk`-able.

GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
28 changes: 28 additions & 0 deletions docker/bootstrap/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,33 @@ List of changes between bootstrap image versions.
## [1] - 2020-12-18
### Changes
- Update bootstrap image to build binaries using golang 1.15

## [2] - 2021-06-21
### Changes
- Update build to golang 1.16

## [3] - 2021-08-18
### Changes
- Update build to golang 1.17

## [4] - 2022-01-18
### Changes
- Import new keys after Oracle rotated the PGP keys for their APT repository for MySQL 8.0+.

## [5] - 2022-03-15
### Changes
- Update build to golang 1.18

## [6] - 2022-04-15
### Changes
- Update build to golang 1.18.1

## [7] - N/A
- Skipped because new images did not change `bootstrap-version` in `test.go`

## [8] - 2022-04-22
### Changes
- Update versions of zookeeper and consul
- Get `make tools` working on M1 mac


43 changes: 41 additions & 2 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *ReplicationStatus) SQLHealthy() bool {

// ReplicationStatusToProto translates a Status to proto3.
func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
return &replicationdatapb.Status{
replstatuspb := &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: EncodePosition(s.FilePosition),
Expand All @@ -91,6 +91,21 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
SqlState: int32(s.SQLState),
LastSqlError: s.LastSQLError,
}

// We need to be able to send gRPC response messages from v14 and newer tablets to
// v13 and older clients. The older clients will not be processing the IoState or
// SqlState values in the message but instead looking at the IoThreadRunning and
// SqlThreadRunning booleans so we need to map and share this dual state.
// Note: v13 and older clients considered the IO thread state of connecting to
// be equal to running. That is why we do so here when mapping the states.
// This backwards compatibility can be removed in v15+.
if s.IOState == ReplicationStateRunning || s.IOState == ReplicationStateConnecting {
replstatuspb.IoThreadRunning = true
}
if s.SQLState == ReplicationStateRunning {
replstatuspb.SqlThreadRunning = true
}
return replstatuspb
}

// ProtoToReplicationStatus translates a proto Status, or panics.
Expand Down Expand Up @@ -118,7 +133,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
panic(vterrors.Wrapf(err, "cannot decode SourceUUID"))
}
}
return ReplicationStatus{
replstatus := ReplicationStatus{
Position: pos,
RelayLogPosition: relayPos,
FilePosition: filePos,
Expand All @@ -134,6 +149,30 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
SQLState: ReplicationState(s.SqlState),
LastSQLError: s.LastSqlError,
}

// We need to be able to process gRPC response messages from v13 and older tablets.
// In those cases there will be no value (unknown) for the IoState or SqlState but
// the message will have the IoThreadRunning and SqlThreadRunning booleans and we
// need to revert to our assumptions about a binary state as that's all the older
// tablet can provide (really only applicable to the IO status as that is NOT binary
// but rather has three states: Running, Stopped, Connecting).
// This backwards compatibility can be removed in v15+.
if replstatus.IOState == ReplicationStateUnknown {
if s.IoThreadRunning {
replstatus.IOState = ReplicationStateRunning
} else {
replstatus.IOState = ReplicationStateStopped
}
}
if replstatus.SQLState == ReplicationStateUnknown {
if s.SqlThreadRunning {
replstatus.SQLState = ReplicationStateRunning
} else {
replstatus.SQLState = ReplicationStateStopped
}
}

return replstatus
}

// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
Expand Down
11 changes: 11 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -86,6 +88,15 @@ func GetPrimaryPosition(t *testing.T, vttablet Vttablet, hostname string) (strin
return pos, gtID
}

// GetReplicationPosition gets the replication status of given vttablet
func GetReplicationPosition(t *testing.T, vttablet *Vttablet, hostname string) *replicationdatapb.Status {
ctx := context.Background()
vtablet := getTablet(vttablet.GrpcPort, hostname)
pos, err := tmClient.ReplicationStatus(ctx, vtablet)
require.NoError(t, err)
return pos
}

// VerifyRowsInTabletForTable verifies the total number of rows in a table.
// This is used to check that replication has caught up with the changes on primary.
func VerifyRowsInTabletForTable(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int, tableName string) {
Expand Down
41 changes: 41 additions & 0 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,44 @@ func TestReparentDoesntHangIfPrimaryFails(t *testing.T) {
require.Error(t, err)
assert.Contains(t, out, "primary failed to PopulateReparentJournal")
}

func TestReplicationStatus(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Stop the SQL_THREAD on tablets[1]
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE SQL_THREAD;`)
require.NoError(t, err)
// Check the replication status on tablets[1] and assert that the IO thread is read to be running and SQL thread is stopped
replicationStatus := cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread := utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.False(t, sqlThread)

// Stop replication on tablets[1] and verify that both the threads are reported as not running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.False(t, sqlThread)

// Start replication on tablets[1] and verify that both the threads are reported as running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `START SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.True(t, sqlThread)

// Stop IO_THREAD on tablets[1] and verify that the IO thread is read to be stopped and SQL thread is running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationPosition(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.True(t, sqlThread)
}
36 changes: 36 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -744,3 +745,38 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V
require.Equal(t, "No", res.Rows[0][11].ToString())
}
}

// ReplicationThreadsStatus returns the status of the IO and SQL thread. It reads the result of the replication status
// based on the vtctl major version provided. It also uses the vttabletVersion to assert on the expectation of the new fields
// being unknown for the old vttablets and that they match for the new vttablets
func ReplicationThreadsStatus(t *testing.T, status *replicationdatapb.Status, vtctlVersion, vttabletVersion int) (bool, bool) {
if vttabletVersion == 13 {
// If vttablet is version 13, then the new fields should be unknown
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
} else {
// For the new vttablet, the new parameters should not be unknown. Moreover, the old parameters should also be provided
// and should agree with the new ones
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
require.Equal(t, status.IoThreadRunning, mysql.ReplicationState(status.IoState) == mysql.ReplicationStateRunning)
require.Equal(t, status.SqlThreadRunning, mysql.ReplicationState(status.SqlState) == mysql.ReplicationStateRunning)
}

// if vtctlVersion provided is 13, then we should read the old parameters, since that is what old vtctl would do
if vtctlVersion == 13 {
return status.IoThreadRunning, status.SqlThreadRunning
}
// If we are at the latest vtctl version, we should read the latest parameters if provided otherwise the old ones
ioState := mysql.ReplicationState(status.IoState)
ioThread := status.IoThreadRunning
if ioState != mysql.ReplicationStateUnknown {
ioThread = ioState == mysql.ReplicationStateRunning
}
sqlState := mysql.ReplicationState(status.SqlState)
sqlThread := status.SqlThreadRunning
if sqlState != mysql.ReplicationStateUnknown {
sqlThread = sqlState == mysql.ReplicationStateRunning
}
return ioThread, sqlThread
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func validateDiff(t *testing.T, fromCreateTable string, toCreateTable string, hi
// The diff can be empty or there can be an actual ALTER TABLE statement
diffedAlterQuery := ""
if diff != nil && !diff.IsEmpty() {
diffedAlterQuery = sqlparser.String(diff.Statement())
diffedAlterQuery = diff.CanonicalStatementString()
}

// Validate the diff! The way we do it is:
Expand Down
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ create table ` + "`Lead`(`Lead-id`" + ` binary(16), name varbinary(16), date1 da
create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead`" + `));
create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id));
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at)) CHARSET=utf8mb4;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id));
`

// These should always be ignored in vreplication
Expand Down Expand Up @@ -60,7 +61,8 @@ create table db_order_test (c_uuid varchar(64) not null default '', created_at d
},
"Lead": {},
"Lead-1": {},
"db_order_test": {}
"db_order_test": {},
"datze": {}
}
}
`
Expand Down Expand Up @@ -127,6 +129,14 @@ create table db_order_test (c_uuid varchar(64) not null default '', created_at d
"name": "xxhash"
}
]
},
"datze": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
}
}
Expand Down
Loading

0 comments on commit 96a0352

Please sign in to comment.