From 5cb66a1797a17c05b447acda5f923c62e5912b27 Mon Sep 17 00:00:00 2001 From: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:15:33 -0600 Subject: [PATCH 1/6] Remove mysql57/percona57 bootstrap images (#16620) Signed-off-by: Florent Poinsard --- Makefile | 2 +- docker/bootstrap/CHANGELOG.md | 3 ++- docker/bootstrap/Dockerfile.mysql57 | 24 ------------------------ docker/bootstrap/Dockerfile.percona57 | 21 --------------------- docker/bootstrap/README.md | 2 -- test.go | 2 +- 6 files changed, 4 insertions(+), 50 deletions(-) delete mode 100644 docker/bootstrap/Dockerfile.mysql57 delete mode 100644 docker/bootstrap/Dockerfile.percona57 diff --git a/Makefile b/Makefile index 219139af796..997eec5db12 100644 --- a/Makefile +++ b/Makefile @@ -284,7 +284,7 @@ $(PROTO_GO_OUTS): minimaltools install_protoc-gen-go proto/*.proto # Please read docker/README.md to understand the different available images. # This rule builds the bootstrap images for all flavors. -DOCKER_IMAGES_FOR_TEST = mysql57 mysql80 percona57 percona80 +DOCKER_IMAGES_FOR_TEST = mysql80 percona80 DOCKER_IMAGES = common $(DOCKER_IMAGES_FOR_TEST) BOOTSTRAP_VERSION=35 ensure_bootstrap_version: diff --git a/docker/bootstrap/CHANGELOG.md b/docker/bootstrap/CHANGELOG.md index ee400664a8b..0ecd5ede02f 100644 --- a/docker/bootstrap/CHANGELOG.md +++ b/docker/bootstrap/CHANGELOG.md @@ -136,4 +136,5 @@ List of changes between bootstrap image versions. ## [35] - 2024-08-14 ### Changes -- Update build to golang 1.23.0 \ No newline at end of file +- Update build to golang 1.23.0 +- MySQL57 and Percona57 tags will be removed thereafter \ No newline at end of file diff --git a/docker/bootstrap/Dockerfile.mysql57 b/docker/bootstrap/Dockerfile.mysql57 deleted file mode 100644 index b840a7b8153..00000000000 --- a/docker/bootstrap/Dockerfile.mysql57 +++ /dev/null @@ -1,24 +0,0 @@ -ARG bootstrap_version -ARG image="vitess/bootstrap:${bootstrap_version}-common" - -FROM --platform=linux/amd64 "${image}" - -USER root - -# Install MySQL 5.7 -RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends gnupg dirmngr ca-certificates && \ - for i in $(seq 1 10); do apt-key adv --no-tty --recv-keys --keyserver keyserver.ubuntu.com A8D3785C && break; done && \ - add-apt-repository 'deb http://repo.mysql.com/apt/debian/ buster mysql-5.7' && \ - for i in $(seq 1 10); do apt-key adv --no-tty --keyserver keyserver.ubuntu.com --recv-keys 9334A25F8507EFA5 && break; done && \ - echo 'deb http://repo.percona.com/apt buster main' > /etc/apt/sources.list.d/percona.list && \ - { \ - echo debconf debconf/frontend select Noninteractive; \ - echo percona-server-server-5.7 percona-server-server/root_password password 'unused'; \ - echo percona-server-server-5.7 percona-server-server/root_password_again password 'unused'; \ - } | debconf-set-selections && \ - apt-get update -y && apt-get install -y percona-release && \ - percona-release enable-only tools && apt-get update -y && \ - DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev libdbd-mysql-perl rsync libev4 percona-xtrabackup-24 && \ - rm -rf /var/lib/apt/lists/* - -USER vitess diff --git a/docker/bootstrap/Dockerfile.percona57 b/docker/bootstrap/Dockerfile.percona57 deleted file mode 100644 index 28a1cd3cb18..00000000000 --- a/docker/bootstrap/Dockerfile.percona57 +++ /dev/null @@ -1,21 +0,0 @@ -ARG bootstrap_version -ARG image="vitess/bootstrap:${bootstrap_version}-common" - -FROM --platform=linux/amd64 "${image}" - -USER root - -# Install Percona 5.7 -RUN for i in $(seq 1 10); do apt-key adv --no-tty --keyserver keyserver.ubuntu.com --recv-keys 9334A25F8507EFA5 && break; done && \ - add-apt-repository 'deb http://repo.percona.com/apt bullseye main' && \ - { \ - echo debconf debconf/frontend select Noninteractive; \ - echo percona-server-server-5.7 percona-server-server/root_password password 'unused'; \ - echo percona-server-server-5.7 percona-server-server/root_password_again password 'unused'; \ - } | debconf-set-selections && \ - apt-get update -y && \ - apt-get install -y --no-install-recommends percona-server-server-5.7 && \ - apt-get install -y --no-install-recommends libperconaserverclient20-dev percona-xtrabackup-24 && \ - rm -rf /var/lib/apt/lists/* - -USER vitess diff --git a/docker/bootstrap/README.md b/docker/bootstrap/README.md index 717f4336442..b273305d6b9 100644 --- a/docker/bootstrap/README.md +++ b/docker/bootstrap/README.md @@ -6,9 +6,7 @@ after successfully running `bootstrap.sh` and `dev.env`. The `vitess/bootstrap` image comes in different flavors: * `vitess/bootstrap:common` - dependencies that are common to all flavors -* `vitess/bootstrap:mysql57` - bootstrap image for MySQL 5.7 * `vitess/bootstrap:mysql80` - bootstrap image for MySQL 8.0 -* `vitess/bootstrap:percona57` - bootstrap image for Percona Server 5.7 * `vitess/bootstrap:percona80` - bootstrap image for Percona Server 8.0 **NOTE: Unlike the base image that builds Vitess itself, this bootstrap image diff --git a/test.go b/test.go index 448dd33d0f9..30764662d33 100755 --- a/test.go +++ b/test.go @@ -112,7 +112,7 @@ const ( configFileName = "test/config.json" // List of flavors for which a bootstrap Docker image is available. - flavors = "mysql57,mysql80,percona,percona57,percona80" + flavors = "mysql80,percona80" ) // Config is the overall object serialized in test/config.json. From 471ab1a20a1f7f1f333ddd378b3edc71ad6de7a3 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Wed, 21 Aug 2024 12:19:55 +0530 Subject: [PATCH 2/6] Atomic Transactions correctness with PRS, ERS and MySQL & Vttablet Restarts (#16553) Signed-off-by: Manan Gupta Signed-off-by: Harshit Gangal Co-authored-by: Harshit Gangal --- .../AtomicTransactionsWithDisruptions.md | 37 +++ .../endtoend/transaction/twopc/main_test.go | 1 + go/test/endtoend/transaction/twopc/schema.sql | 19 +- .../twopc/{fuzzer => stress}/fuzzer_test.go | 159 +++++++++++- .../twopc/{fuzzer => stress}/main_test.go | 14 +- .../twopc/{fuzzer => stress}/schema.sql | 6 + .../transaction/twopc/stress/stress_test.go | 229 ++++++++++++++++++ .../twopc/{fuzzer => stress}/vschema.json | 8 + .../endtoend/transaction/twopc/twopc_test.go | 137 +---------- .../endtoend/transaction/twopc/utils/utils.go | 32 ++- go/vt/vttablet/endtoend/framework/client.go | 2 - go/vt/vttablet/tabletmanager/rpc_actions.go | 13 + .../vttablet/tabletmanager/rpc_replication.go | 9 +- go/vt/vttablet/tabletmanager/tm_init.go | 19 ++ go/vt/vttablet/tabletmanager/tm_state.go | 3 +- go/vt/vttablet/tabletserver/controller.go | 2 + go/vt/vttablet/tabletserver/dt_executor.go | 6 +- .../tabletserver/query_executor_test.go | 3 - go/vt/vttablet/tabletserver/state_manager.go | 7 + .../tabletserver/state_manager_test.go | 5 + .../tabletserver/stateful_connection.go | 4 +- go/vt/vttablet/tabletserver/tabletserver.go | 10 +- .../tabletserver/tabletserver_test.go | 19 +- go/vt/vttablet/tabletserver/twopc.go | 4 +- go/vt/vttablet/tabletserver/tx_engine.go | 159 +++++++----- go/vt/vttablet/tabletserver/tx_prep_pool.go | 31 ++- .../tabletserver/tx_prep_pool_test.go | 18 +- go/vt/vttablet/tabletservermock/controller.go | 3 + test/config.json | 4 +- 29 files changed, 709 insertions(+), 254 deletions(-) create mode 100644 doc/design-docs/AtomicTransactionsWithDisruptions.md rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/fuzzer_test.go (70%) rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/main_test.go (90%) rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/schema.sql (75%) create mode 100644 go/test/endtoend/transaction/twopc/stress/stress_test.go rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/vschema.json (74%) diff --git a/doc/design-docs/AtomicTransactionsWithDisruptions.md b/doc/design-docs/AtomicTransactionsWithDisruptions.md new file mode 100644 index 00000000000..706308b6b2b --- /dev/null +++ b/doc/design-docs/AtomicTransactionsWithDisruptions.md @@ -0,0 +1,37 @@ +# Handling disruptions in atomic transactions + +## Overview + +This document describes how to make atomic transactions resilient in the face of disruptions. The basic design and components involved in an atomic transaction are described in [here](./TwoPhaseCommitDesign.md) The document describes each of the disruptions that can happen in a running cluster and how atomic transactions are engineered to handle them without breaking their guarantee of being atomic. + +## `PlannedReparentShard` and `EmergencyReparentShard` + +For both Planned and Emergency reparents, we call `DemotePrimary` on the primary tablet. For Planned reparent, this call has to succeed, while on Emergency reparent, if the primary is unreachable then this call can fail, and we would still proceed further. + +As part of the `DemotePrimary` flow, when we transition the tablet to a non-serving state, we wait for all the transactions to have completed (in `TxEngine.shutdownLocked()` we have `te.txPool.WaitForEmpty()`). If the user has specified a shutdown grace-period, then after that much time elapses, we go ahead and forcefully kill all running queries. We then also rollback the prepared transactions. It is crucial that we rollback the prepared transactions only after all other writes have been killed, because when we rollback a prepared transaction, it lets go of the locks it was holding. If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through and cause data corruption since we won't be able to prepare the transaction again. All the code to kill queries can be found in `stateManager.terminateAllQueries()`. + +The above outlined steps ensure that we either wait for all prepared transactions to conclude or we rollback them safely so that they can be prepared again on the new primary. + +On the new primary, when we call `PromoteReplica`, we redo all the prepared transactions before we allow any new writes to go through. This ensures that the new primary is in the same state as the old primary was before the reparent. The code for redoing the prepared transactions can be found in `TxEngine.RedoPreparedTransactions()`. + +If everything goes as described above, there is no reason for redoing of prepared transactions to fail. But in case, something unexpected happens and preparing transactions fails, we still allow the vttablet to accept new writes because we decided availability of the tablet is more important. We will however, build tooling and metrics for the users to be notified of these failures and let them handle this in the way they see fit. + +While Planned reparent is an operation where all the processes are running fine, Emergency reparent is called when something has gone wrong with the cluster. Because we call `DemotePrimary` in parallel with `StopReplicationAndBuildStatusMap`, we can run into a case wherein the primary tries to write something to the binlog after all the replicas have stopped replicating. If we were to run without semi-sync, then the primary could potentially commit a prepared transaction, and return a success to the vtgate trying to commit this transaction. The vtgate can then conclude that the transaction is safe to conclude and remove all the metadata information. However, on the new primary since the transaction commit didn't get replicated, it would re-prepare the transaction and would wait for a coordinator to either commit or rollback it, but that would never happen. Essentially we would have a transaction stuck in prepared state on a shard indefinitely. To avoid this situation, it is essential that we run with semi-sync, because this ensures that any write that is acknowledged as a success to the caller, would necessarily have to be replicated to at least one replica. This ensures that the transaction would also already be committed on the new primary. + +## MySQL Restarts + +When MySQL restarts, it loses all the ongoing transactions which includes all the prepared transactions. This is because the transaction logs are not persistent across restarts. This is a MySQL limitation and there is no way to get around this. However, at the Vitess level we must ensure that we can commit the prepared transactions even in case of MySQL restarts without any failures. + +Vttablet has the code to detect MySQL failures and call `stateManager.checkMySQL()` which transitions the tablet to a NotConnected state. This prevents any writes from going through until the vttablet has transitioned back to a serving state. + +However, we cannot rely on `checkMySQL` to ensure that no conflicting writes go through. This is because the time between MySQL restart and the vttablet transitioning to a NotConnected state can be large. During this time, the vttablet would still be accepting writes and some of them could potentially conflict with the prepared transactions. + +To handle this, we rely on the fact that when MySQL restarts, it starts with super-read-only turned on. This means that no writes can go through. It is VTOrc that registers this as an issue and fixes it by calling `UndoDemotePrimary`. As part of that call, before we set MySQL to read-write, we ensure that all the prepared transactions are redone in the read_only state. We use the dba pool (that has admin permissions) to prepare the transactions. This is safe because we know that no conflicting writes can go through until we set MySQL to read-write. The code to set MySQL to read-write after redoing prepared transactions can be found in `TabletManager.redoPreparedTransactionsAndSetReadWrite()`. + +Handling MySQL restarts is the only reason we needed to add the code to redo prepared transactions whenever MySQL transitions from super-read-only to read-write state. Even though, we only need to do this in `UndoDemotePrimary`, it not necessary that it is `UndoDemotePrimary` that sets MySQL to read-write. If the user notices that the tablet is in a read-only state before VTOrc has a chance to fix it, they can manually call `SetReadWrite` on the tablet. +Therefore, the safest option was to always check if we need to redo the prepared transactions whenever MySQL transitions from super-read-only to read-write state. + +## Vttablet Restarts + +When Vttabet restarts, all the previous connections are dropped. It starts in a non-serving state, and then after reading the shard and tablet records from the topo, it transitions to a serving state. +As part of this transition we need to ensure that we redo the prepared transactions before we start accepting any writes. This is done as part of the `TxEngine.transition` function when we transition to an `AcceptingReadWrite` state. We call the same code for redoing the prepared transactions that we called for MySQL restarts, PRS and ERS. diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 4c5e2715563..9a46562d1c7 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -111,6 +111,7 @@ func start(t *testing.T) (*mysql.Conn, func()) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) + cleanup(t) return conn, func() { conn.Close() diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index de9e3ef0656..7c289a03c2a 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -1,18 +1,21 @@ -create table twopc_user ( - id bigint, +create table twopc_user +( + id bigint, name varchar(64), primary key (id) ) Engine=InnoDB; -create table twopc_music ( - id varchar(64), +create table twopc_music +( + id varchar(64), user_id bigint, - title varchar(64), + title varchar(64), primary key (id) ) Engine=InnoDB; -create table twopc_t1 ( - id bigint, +create table twopc_t1 +( + id bigint, col bigint, - primary key (id, col) + primary key (id) ) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go similarity index 70% rename from go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go rename to go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index ff440164042..e81d0d0d9ab 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -14,13 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fuzzer +package stress import ( "context" "fmt" + "os" + "path" + "strconv" + "strings" "sync" "sync/atomic" + "syscall" "testing" "time" @@ -28,6 +33,8 @@ import ( "golang.org/x/exp/rand" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/syscallutil" + "vitess.io/vitess/go/vt/log" ) var ( @@ -67,10 +74,12 @@ var ( // Moreover, the threadIDs of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order. func TestTwoPCFuzzTest(t *testing.T) { testcases := []struct { - name string - threads int - updateSets int - timeForTesting time.Duration + name string + threads int + updateSets int + timeForTesting time.Duration + clusterDisruptions []func() + disruptionProbability []int }{ { name: "Single Thread - Single Set", @@ -90,15 +99,24 @@ func TestTwoPCFuzzTest(t *testing.T) { updateSets: 15, timeForTesting: 5 * time.Second, }, + { + name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL and Vttablet restart disruptions", + threads: 15, + updateSets: 15, + timeForTesting: 5 * time.Second, + clusterDisruptions: []func(){prs, ers, mysqlRestarts, vttabletRestarts}, + disruptionProbability: []int{5, 5, 5, 5}, + }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { conn, closer := start(t) defer closer() - fz := newFuzzer(tt.threads, tt.updateSets) + fz := newFuzzer(tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability) fz.initialize(t, conn) + conn.Close() // Start the fuzzer. fz.start(t) @@ -108,8 +126,12 @@ func TestTwoPCFuzzTest(t *testing.T) { // Signal the fuzzer to stop. fz.stop() + // Wait for all transactions to be resolved. + waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second) // Verify that all the transactions run were actually atomic and no data issues have occurred. fz.verifyTransactionsWereAtomic(t) + + log.Errorf("Verification complete. All good!") }) } } @@ -176,14 +198,20 @@ type fuzzer struct { wg sync.WaitGroup // updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment. updateRowsVals [][]int + // clusterDisruptions are the cluster level disruptions that can happen in a running cluster. + clusterDisruptions []func() + // disruptionProbability is the chance for the disruption to happen. We check this every 100 milliseconds. + disruptionProbability []int } // newFuzzer creates a new fuzzer struct. -func newFuzzer(threads int, updateSets int) *fuzzer { +func newFuzzer(threads int, updateSets int, clusterDisruptions []func(), disruptionProbability []int) *fuzzer { fz := &fuzzer{ - threads: threads, - updateSets: updateSets, - wg: sync.WaitGroup{}, + threads: threads, + updateSets: updateSets, + wg: sync.WaitGroup{}, + clusterDisruptions: clusterDisruptions, + disruptionProbability: disruptionProbability, } // Initially the fuzzer thread is stopped. fz.shouldStop.Store(true) @@ -202,12 +230,16 @@ func (fz *fuzzer) stop() { func (fz *fuzzer) start(t *testing.T) { // We mark the fuzzer thread to be running now. fz.shouldStop.Store(false) - fz.wg.Add(fz.threads) + // fz.threads is the count of fuzzer threads, and one disruption thread. + fz.wg.Add(fz.threads + 1) for i := 0; i < fz.threads; i++ { go func() { fz.runFuzzerThread(t, i) }() } + go func() { + fz.runClusterDisruptionThread(t) + }() } // runFuzzerThread is used to run a thread of the fuzzer. @@ -308,3 +340,108 @@ func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { }) return queries } + +// runClusterDisruptionThread runs the cluster level disruptions in a separate thread. +func (fz *fuzzer) runClusterDisruptionThread(t *testing.T) { + // Whenever we finish running this thread, we should mark the thread has stopped. + defer func() { + fz.wg.Done() + }() + + for { + // If disruption thread is marked to be stopped, then we should exit this go routine. + if fz.shouldStop.Load() == true { + return + } + // Run a potential disruption + fz.runClusterDisruption(t) + time.Sleep(100 * time.Millisecond) + } + +} + +// runClusterDisruption tries to run a single cluster disruption. +func (fz *fuzzer) runClusterDisruption(t *testing.T) { + for idx, prob := range fz.disruptionProbability { + if rand.Intn(100) < prob { + fz.clusterDisruptions[idx]() + return + } + } +} + +/* +Cluster Level Disruptions for the fuzzer +*/ + +func prs() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + newPrimary := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Running PRS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias) + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) + if err != nil { + log.Errorf("error running PRS - %v", err) + } +} + +func ers() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + newPrimary := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Running ERS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias) + _, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias) + if err != nil { + log.Errorf("error running ERS - %v", err) + } +} + +func vttabletRestarts() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + tablet := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Restarting vttablet for - %v/%v - %v", keyspaceName, shard.Name, tablet.Alias) + err := tablet.VttabletProcess.TearDown() + if err != nil { + log.Errorf("error stopping vttablet - %v", err) + return + } + tablet.VttabletProcess.ServingStatus = "SERVING" + for { + err = tablet.VttabletProcess.Setup() + if err == nil { + return + } + // Sometimes vttablets fail to connect to the topo server due to a minor blip there. + // We don't want to fail the test, so we retry setting up the vttablet. + log.Errorf("error restarting vttablet - %v", err) + time.Sleep(1 * time.Second) + } +} + +func mysqlRestarts() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + tablet := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Restarting MySQL for - %v/%v tablet - %v", keyspaceName, shard.Name, tablet.Alias) + pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tablet.TabletUID)) + pidBytes, err := os.ReadFile(pidFile) + if err != nil { + // We can't read the file which means the PID file does not exist + // The server must have stopped + return + } + pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes))) + if err != nil { + log.Errorf("Error in conversion to integer: %v", err) + return + } + err = syscallutil.Kill(pid, syscall.SIGKILL) + if err != nil { + log.Errorf("Error in killing process: %v", err) + } +} diff --git a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go similarity index 90% rename from go/test/endtoend/transaction/twopc/fuzzer/main_test.go rename to go/test/endtoend/transaction/twopc/stress/main_test.go index e0affde186a..9c7ed28fa1a 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fuzzer +package stress import ( "context" @@ -75,12 +75,13 @@ func TestMain(m *testing.M) { // Start keyspace keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: SchemaSQL, - VSchema: VSchema, - SidecarDBName: sidecarDBName, + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + SidecarDBName: sidecarDBName, + DurabilityPolicy: "semi_sync", } - if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } @@ -113,4 +114,5 @@ func cleanup(t *testing.T) { utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert") utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update") + utils.ClearOutTable(t, vtParams, "twopc_t1") } diff --git a/go/test/endtoend/transaction/twopc/fuzzer/schema.sql b/go/test/endtoend/transaction/twopc/stress/schema.sql similarity index 75% rename from go/test/endtoend/transaction/twopc/fuzzer/schema.sql rename to go/test/endtoend/transaction/twopc/stress/schema.sql index 290da808991..5173166bfd4 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/schema.sql +++ b/go/test/endtoend/transaction/twopc/stress/schema.sql @@ -12,3 +12,9 @@ create table twopc_fuzzer_insert ( key(col), primary key (id, col) ) Engine=InnoDB; + +create table twopc_t1 ( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go new file mode 100644 index 00000000000..9912bdf6e19 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -0,0 +1,229 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stress + +import ( + "context" + "fmt" + "os" + "path" + "strconv" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/syscallutil" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" +) + +// TestDisruptions tests that atomic transactions persevere through various disruptions. +func TestDisruptions(t *testing.T) { + testcases := []struct { + disruptionName string + commitDelayTime string + disruption func() error + }{ + { + disruptionName: "No Disruption", + commitDelayTime: "1", + disruption: func() error { + return nil + }, + }, + { + disruptionName: "PlannedReparentShard", + commitDelayTime: "5", + disruption: prsShard3, + }, + { + disruptionName: "MySQL Restart", + commitDelayTime: "5", + disruption: mysqlRestartShard3, + }, + { + disruptionName: "Vttablet Restart", + commitDelayTime: "5", + disruption: vttabletRestartShard3, + }, + { + disruptionName: "EmergencyReparentShard", + commitDelayTime: "5", + disruption: ersShard3, + }, + } + for _, tt := range testcases { + t.Run(fmt.Sprintf("%s-%ss delay", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) { + // Reparent all the shards to first tablet being the primary. + reparentToFirstTablet(t) + // cleanup all the old data. + conn, closer := start(t) + defer closer() + // Start an atomic transaction. + utils.Exec(t, conn, "begin") + // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits + // it is very easy to figure out what value will end up in which shard. + idVals := []int{4, 6, 9} + for _, val := range idVals { + utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val)) + } + // We want to delay the commit on one of the shards to simulate slow commits on a shard. + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime) + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) + // We will execute a commit in a go routine, because we know it will take some time to complete. + // While the commit is ongoing, we would like to run the disruption. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + log.Errorf("Error in commit - %v", err) + } + }() + // Allow enough time for the commit to have started. + time.Sleep(1 * time.Second) + writeCtx, writeCancel := context.WithCancel(context.Background()) + var writerWg sync.WaitGroup + // Run multiple threads to try to write to the database on the same values of id to ensure that we don't + // allow any writes while the transaction is prepared and not committed. + for i := 0; i < 10; i++ { + writerWg.Add(1) + go func() { + defer writerWg.Done() + threadToWrite(t, writeCtx, idVals[i%3]) + }() + } + // Run the disruption. + err := tt.disruption() + require.NoError(t, err) + // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. + // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. + wg.Wait() + // Check the data in the table. + waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) + writeCancel() + writerWg.Wait() + }) + } +} + +// threadToWrite is a helper function to write to the database in a loop. +func threadToWrite(t *testing.T, ctx context.Context, id int) { + for { + select { + case <-ctx.Done(): + return + default: + } + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + continue + } + _, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000))) + } +} + +// reparentToFirstTablet reparents all the shards to first tablet being the primary. +func reparentToFirstTablet(t *testing.T) { + ks := clusterInstance.Keyspaces[0] + for _, shard := range ks.Shards { + primary := shard.Vttablets[0] + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, primary.Alias) + require.NoError(t, err) + } +} + +// waitForResults waits for the results of the query to be as expected. +func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { + timeout := time.After(waitTime) + var prevRes []sqltypes.Row + for { + select { + case <-timeout: + t.Fatalf("didn't reach expected results for %s. Last results - %v", query, prevRes) + default: + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err == nil { + res := utils.Exec(t, conn, query) + conn.Close() + prevRes = res.Rows + if fmt.Sprintf("%v", res.Rows) == resultExpected { + return + } + } + time.Sleep(100 * time.Millisecond) + } + } +} + +/* +Cluster Level Disruptions for the fuzzer +*/ + +// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. +func prsShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + newPrimary := shard.Vttablets[1] + return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) +} + +// ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. +func ersShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + newPrimary := shard.Vttablets[1] + _, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias) + return err +} + +// vttabletRestartShard3 restarts the first vttablet of the third shard. +func vttabletRestartShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + tablet := shard.Vttablets[0] + return tablet.RestartOnlyTablet() +} + +// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard. +func mysqlRestartShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + vttablets := shard.Vttablets + tablet := vttablets[0] + log.Errorf("Restarting MySQL for - %v/%v tablet - %v", keyspaceName, shard.Name, tablet.Alias) + pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tablet.TabletUID)) + pidBytes, err := os.ReadFile(pidFile) + if err != nil { + // We can't read the file which means the PID file does not exist + // The server must have stopped + return err + } + pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes))) + if err != nil { + return err + } + return syscallutil.Kill(pid, syscall.SIGKILL) +} diff --git a/go/test/endtoend/transaction/twopc/fuzzer/vschema.json b/go/test/endtoend/transaction/twopc/stress/vschema.json similarity index 74% rename from go/test/endtoend/transaction/twopc/fuzzer/vschema.json rename to go/test/endtoend/transaction/twopc/stress/vschema.json index e3854f8f101..415b5958f54 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/vschema.json +++ b/go/test/endtoend/transaction/twopc/stress/vschema.json @@ -21,6 +21,14 @@ "name": "reverse_bits" } ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 5aab1f5a2e2..ce104fa94ec 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -20,8 +20,6 @@ import ( "context" _ "embed" "fmt" - "os" - "path" "reflect" "sort" "strings" @@ -35,19 +33,14 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) -const ( - DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" - DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" -) - // TestDTCommit tests distributed transaction commit for insert, update and delete operations // It verifies the binlog events for the same with transaction state changes and redo statements. func TestDTCommit(t *testing.T) { @@ -996,10 +989,10 @@ func TestReadingUnresolvedTransactions(t *testing.T) { utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") // We want to delay the commit on one of the shards to simulate slow commits on a shard. - writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") - defer deleteFile(DebugDelayCommitShard) - writeTestCommunicationFile(t, DebugDelayCommitTime, "5") - defer deleteFile(DebugDelayCommitTime) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, "5") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) // We will execute a commit in a go routine, because we know it will take some time to complete. // While the commit is ongoing, we would like to check that we see the unresolved transaction. var wg sync.WaitGroup @@ -1008,7 +1001,7 @@ func TestReadingUnresolvedTransactions(t *testing.T) { defer wg.Done() _, err := utils.ExecAllowError(t, conn, "commit") if err != nil { - log.Errorf("Error in commit - %v", err) + fmt.Println("Error in commit: ", err.Error()) } }() // Allow enough time for the commit to have started. @@ -1029,121 +1022,3 @@ func TestReadingUnresolvedTransactions(t *testing.T) { }) } } - -// TestDisruptions tests that atomic transactions persevere through various disruptions. -func TestDisruptions(t *testing.T) { - testcases := []struct { - disruptionName string - commitDelayTime string - disruption func() error - }{ - { - disruptionName: "No Disruption", - commitDelayTime: "1", - disruption: func() error { - return nil - }, - }, - { - disruptionName: "PlannedReparentShard", - commitDelayTime: "5", - disruption: prsShard3, - }, - } - for _, tt := range testcases { - t.Run(fmt.Sprintf("%s-%ss timeout", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) { - // Reparent all the shards to first tablet being the primary. - reparentToFistTablet(t) - // cleanup all the old data. - conn, closer := start(t) - defer closer() - // Start an atomic transaction. - utils.Exec(t, conn, "begin") - // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits - // it is very easy to figure out what value will end up in which shard. - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") - // We want to delay the commit on one of the shards to simulate slow commits on a shard. - writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") - defer deleteFile(DebugDelayCommitShard) - writeTestCommunicationFile(t, DebugDelayCommitTime, tt.commitDelayTime) - defer deleteFile(DebugDelayCommitTime) - // We will execute a commit in a go routine, because we know it will take some time to complete. - // While the commit is ongoing, we would like to run the disruption. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _, err := utils.ExecAllowError(t, conn, "commit") - if err != nil { - log.Errorf("Error in commit - %v", err) - } - }() - // Allow enough time for the commit to have started. - time.Sleep(1 * time.Second) - // Run the disruption. - err := tt.disruption() - require.NoError(t, err) - // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. - // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. - wg.Wait() - // Check the data in the table. - waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 10*time.Second) - }) - } -} - -// reparentToFistTablet reparents all the shards to first tablet being the primary. -func reparentToFistTablet(t *testing.T) { - ks := clusterInstance.Keyspaces[0] - for _, shard := range ks.Shards { - primary := shard.Vttablets[0] - err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, primary.Alias) - require.NoError(t, err) - } -} - -// writeTestCommunicationFile writes the content to the file with the given name. -// We use these files to coordinate with the vttablets running in the debug mode. -func writeTestCommunicationFile(t *testing.T, fileName string, content string) { - err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) - require.NoError(t, err) -} - -// deleteFile deletes the file specified. -func deleteFile(fileName string) { - _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) -} - -// waitForResults waits for the results of the query to be as expected. -func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { - timeout := time.After(waitTime) - for { - select { - case <-timeout: - t.Fatalf("didn't reach expected results for %s", query) - default: - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - res := utils.Exec(t, conn, query) - conn.Close() - if fmt.Sprintf("%v", res.Rows) == resultExpected { - return - } - time.Sleep(100 * time.Millisecond) - } - } -} - -/* -Cluster Level Disruptions for the fuzzer -*/ - -// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. -func prsShard3() error { - shard := clusterInstance.Keyspaces[0].Shards[2] - newPrimary := shard.Vttablets[1] - return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) -} diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 7311375ee55..b3b8796accf 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -19,12 +19,19 @@ package utils import ( "context" "fmt" + "os" + "path" "testing" + "time" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/log" +) + +const ( + DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" + DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" ) // ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, @@ -33,12 +40,16 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { ctx := context.Background() for { conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) + if err != nil { + fmt.Printf("Error in connection - %v\n", err) + continue + } res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) if err != nil { - log.Errorf("Error in selecting - %v", err) + fmt.Printf("Error in selecting - %v\n", err) conn.Close() + time.Sleep(100 * time.Millisecond) continue } require.Len(t, res.Rows, 1) @@ -51,9 +62,22 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { } _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) if err != nil { - log.Errorf("Error in cleanup deletion - %v", err) + fmt.Printf("Error in cleanup deletion - %v\n", err) conn.Close() + time.Sleep(100 * time.Millisecond) continue } } } + +// WriteTestCommunicationFile writes the content to the file with the given name. +// We use these files to coordinate with the vttablets running in the debug mode. +func WriteTestCommunicationFile(t *testing.T, fileName string, content string) { + err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) + require.NoError(t, err) +} + +// DeleteFile deletes the file specified. +func DeleteFile(fileName string) { + _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) +} diff --git a/go/vt/vttablet/endtoend/framework/client.go b/go/vt/vttablet/endtoend/framework/client.go index 1cbff71dc25..e4c2aa66066 100644 --- a/go/vt/vttablet/endtoend/framework/client.go +++ b/go/vt/vttablet/endtoend/framework/client.go @@ -187,8 +187,6 @@ func (client *QueryClient) UnresolvedTransactions() ([]*querypb.TransactionMetad // It currently supports only primary->replica and back. func (client *QueryClient) SetServingType(tabletType topodatapb.TabletType) error { err := client.server.SetServingType(tabletType, time.Time{}, true /* serving */, "" /* reason */) - // Wait for TwoPC transition, if necessary - client.server.TwoPCEngineWait() return err } diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index 45dd51670ba..21560f9d34b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -78,7 +78,20 @@ func (tm *TabletManager) SetReadOnly(ctx context.Context, rdonly bool) error { return err } defer tm.unlock() + superRo, err := tm.MysqlDaemon.IsSuperReadOnly(ctx) + if err != nil { + return err + } + if !rdonly && superRo { + // If super read only is set, then we need to prepare the transactions before setting read_only OFF. + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. + // setting read_only OFF will also set super_read_only OFF if it was set. + // If super read only is already off, then we probably called this function from PRS or some other place + // because it is idempotent. We only need to redo prepared transactions the first time we transition from super read only + // to read write. + return tm.redoPreparedTransactionsAndSetReadWrite(ctx) + } return tm.MysqlDaemon.SetReadOnly(ctx, rdonly) } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 3e745222092..b34e94a16a7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -544,9 +544,10 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure defer func() { if finalErr != nil && revertPartialFailure && !wasReadOnly { + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. // setting read_only OFF will also set super_read_only OFF if it was set - if err := tm.MysqlDaemon.SetReadOnly(ctx, false); err != nil { - log.Warningf("SetReadOnly(false) failed during revert: %v", err) + if err = tm.redoPreparedTransactionsAndSetReadWrite(ctx); err != nil { + log.Warningf("RedoPreparedTransactionsAndSetReadWrite failed during revert: %v", err) } } }() @@ -599,8 +600,8 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e return err } - // Now, set the server read-only false. - if err := tm.MysqlDaemon.SetReadOnly(ctx, false); err != nil { + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. + if err = tm.redoPreparedTransactionsAndSetReadWrite(ctx); err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index 6046ed99727..2e70596b686 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -50,6 +50,7 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sets" @@ -751,6 +752,24 @@ func (tm *TabletManager) findMysqlPort(retryInterval time.Duration) { } } +// redoPreparedTransactionsAndSetReadWrite redoes prepared transactions in read-only mode. +// We turn off super read only mode, and then redo the transactions. Finally, we turn off read-only mode to allow for further traffic. +func (tm *TabletManager) redoPreparedTransactionsAndSetReadWrite(ctx context.Context) error { + _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false) + if err != nil { + // Ignore the error if the sever doesn't support super read only variable. + // We should just redo the preapred transactions before we set it to read-write. + if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable { + log.Warningf("server does not know about super_read_only, continuing anyway...") + } else { + return err + } + } + tm.QueryServiceControl.RedoPreparedTransactions() + err = tm.MysqlDaemon.SetReadOnly(ctx, false) + return err +} + func (tm *TabletManager) initTablet(ctx context.Context) error { tablet := tm.Tablet() err := tm.TopoServer.CreateTablet(ctx, tablet) diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index d9389bf3559..cf56c515cfc 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -214,9 +214,10 @@ func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.T } if action == DBActionSetReadWrite { + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. // We call SetReadOnly only after the topo has been updated to avoid // situations where two tablets are primary at the DB level but not at the vitess level - if err := ts.tm.MysqlDaemon.SetReadOnly(ctx, false); err != nil { + if err = ts.tm.redoPreparedTransactionsAndSetReadWrite(ctx); err != nil { return err } } diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index 0336d9a73cc..69d2edbfdc1 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -93,6 +93,8 @@ type Controller interface { // CheckThrottler CheckThrottler(ctx context.Context, appName string, flags *throttle.CheckFlags) *throttle.CheckResult GetThrottlerStatus(ctx context.Context) *throttle.ThrottlerStatus + + RedoPreparedTransactions() } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 1fd1df12d56..a08cd9dc635 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -221,13 +221,13 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { // If the connection is tainted, we cannot take a commit decision on it. if conn.IsTainted() { dte.inTransaction(func(conn *StatefulConnection) error { - return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateRollback) }) // return the error, defer call above will roll back the transaction. return vterrors.VT10002("cannot commit the transaction on a reserved connection") } - err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT) + err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateCommit) if err != nil { return err } @@ -254,7 +254,7 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error { } return dte.inTransaction(func(conn *StatefulConnection) error { - return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateRollback) }) } diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 771d9e3479d..cc72c629ddb 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1532,9 +1532,6 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb tsv := NewTabletServer(ctx, vtenv.NewTestEnv(), "TabletServerTest", cfg, memorytopo.NewServer(ctx, ""), &topodatapb.TabletAlias{}, srvTopoCounts) target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} err := tsv.StartService(target, dbconfigs, nil /* mysqld */) - if cfg.TwoPCEnable { - tsv.TwoPCEngineWait() - } if err != nil { panic(err) } diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 308f9165ba6..3fe78457b60 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -164,6 +164,7 @@ type ( AcceptReadWrite() AcceptReadOnly() Close() + RollbackPrepared() } subComponent interface { @@ -610,6 +611,12 @@ func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) log.Infof("Killed all stateless OLTP queries.") sm.statefulql.TerminateAll() log.Infof("Killed all OLTP queries.") + // We can rollback prepared transactions only after we have killed all the write queries in progress. + // This is essential because when we rollback a prepared transaction, it lets go of the locks it was holding. + // If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through + // and cause data corruption since we won't be able to prepare the transaction again. + sm.te.RollbackPrepared() + log.Infof("Rollbacked all prepared transactions") }() return cancel } diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 02896eeefe0..f70e77de710 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -379,6 +379,9 @@ func (te *delayedTxEngine) Close() { time.Sleep(50 * time.Millisecond) } +func (te *delayedTxEngine) RollbackPrepared() { +} + type killableConn struct { id int64 killed atomic.Bool @@ -903,6 +906,8 @@ func (te *testTxEngine) Close() { te.state = testStateClosed } +func (te *testTxEngine) RollbackPrepared() {} + type testSubcomponent struct { testOrderState } diff --git a/go/vt/vttablet/tabletserver/stateful_connection.go b/go/vt/vttablet/tabletserver/stateful_connection.go index c0dc973fa87..91d51677241 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection.go +++ b/go/vt/vttablet/tabletserver/stateful_connection.go @@ -174,7 +174,9 @@ func (sc *StatefulConnection) ReleaseString(reason string) { if sc.dbConn == nil { return } - sc.pool.unregister(sc.ConnID, reason) + if sc.pool != nil { + sc.pool.unregister(sc.ConnID, reason) + } sc.dbConn.Recycle() sc.dbConn = nil sc.logReservedConn() diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e3e951892b7..62cc5ca32f0 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -476,11 +476,6 @@ func (tsv *TabletServer) TableGC() *gc.TableGC { return tsv.tableGC } -// TwoPCEngineWait waits until the TwoPC engine has been opened, and the redo read -func (tsv *TabletServer) TwoPCEngineWait() { - tsv.te.twoPCReady.Wait() -} - // SchemaEngine returns the SchemaEngine part of TabletServer. func (tsv *TabletServer) SchemaEngine() *schema.Engine { return tsv.se @@ -1692,6 +1687,11 @@ func (tsv *TabletServer) GetThrottlerStatus(ctx context.Context) *throttle.Throt return r } +// RedoPreparedTransactions redoes the prepared transactions. +func (tsv *TabletServer) RedoPreparedTransactions() { + tsv.te.RedoPreparedTransactions() +} + // HandlePanic is part of the queryservice.QueryService interface func (tsv *TabletServer) HandlePanic(err *error) { if x := recover(); x != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 7ffd201c0a4..7f863e26df7 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -154,9 +154,10 @@ func TestTabletServerPrimaryToReplica(t *testing.T) { // Reuse code from tx_executor_test. _, tsv, db := newTestTxExecutor(t, ctx) // This is required because the test is verifying that we rollback transactions on changing serving type, - // but that only happens immediately if the shut down grace period is not specified. - tsv.te.shutdownGracePeriod = 0 - tsv.sm.shutdownGracePeriod = 0 + // but that only happens when we have a shutdown grace period, otherwise we wait for transactions to be resolved + // indefinitely. + tsv.te.shutdownGracePeriod = 1 + tsv.sm.shutdownGracePeriod = 1 defer tsv.StopService() defer db.Close() target := querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} @@ -200,14 +201,20 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { _, tsv, db := newTestTxExecutor(t, ctx) defer tsv.StopService() defer db.Close() - tsv.SetServingType(topodatapb.TabletType_REPLICA, time.Time{}, true, "") + // This is required because the test is verifying that we rollback transactions on changing serving type, + // but that only happens when we have a shutdown grace period, otherwise we wait for transactions to be resolved + // indefinitely. + tsv.te.shutdownGracePeriod = 1 + tsv.sm.shutdownGracePeriod = 1 + tsv.SetServingType(topodatapb.TabletType_PRIMARY, time.Time{}, false, "") turnOnTxEngine := func() { tsv.SetServingType(topodatapb.TabletType_PRIMARY, time.Time{}, true, "") - tsv.TwoPCEngineWait() } turnOffTxEngine := func() { - tsv.SetServingType(topodatapb.TabletType_REPLICA, time.Time{}, true, "") + // We can use a transition to PRIMARY non-serving or REPLICA serving to turn off the transaction engine. + // With primary serving, the shutdown of prepared transactions is synchronous, but for the latter its asynchronous. + tsv.SetServingType(topodatapb.TabletType_PRIMARY, time.Time{}, false, "") } tpc := tsv.te.twoPC diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index 0bdf4ac0c91..b3c5ab628c3 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -374,7 +374,7 @@ func (tpc *TwoPC) ReadTransaction(ctx context.Context, dtid string) (*querypb.Tr return nil, vterrors.Wrapf(err, "error parsing state for dtid %s", dtid) } result.State = querypb.TransactionState(st) - if result.State < querypb.TransactionState_PREPARE || result.State > querypb.TransactionState_COMMIT { + if result.State < DTStatePrepare || result.State > DTStateCommit { return nil, fmt.Errorf("unexpected state for dtid %s: %v", dtid, result.State) } // A failure in time parsing will show up as a very old time, @@ -427,7 +427,7 @@ func (tpc *TwoPC) ReadAllTransactions(ctx context.Context) ([]*tx.DistributedTx, log.Errorf("Error parsing state for dtid %s: %v.", dtid, err) } protostate := querypb.TransactionState(st) - if protostate < querypb.TransactionState_PREPARE || protostate > querypb.TransactionState_COMMIT { + if protostate < DTStatePrepare || protostate > DTStateCommit { log.Errorf("Unexpected state for dtid %s: %v.", dtid, protostate) } curTx = &tx.DistributedTx{ diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 33e22e321bc..ea4e0b1e41d 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -87,7 +87,6 @@ type TxEngine struct { txPool *TxPool preparedPool *TxPreparedPool twoPC *TwoPC - twoPCReady sync.WaitGroup dxNotify func() } @@ -128,9 +127,6 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine { } // AcceptReadWrite will start accepting all transactions. -// If transitioning from RO mode, transactions are rolled -// back before accepting new transactions. This is to allow -// for 2PC state to be correctly initialized. func (te *TxEngine) AcceptReadWrite() { te.transition(AcceptingReadAndWrite) } @@ -149,37 +145,70 @@ func (te *TxEngine) transition(state txEngineState) { } log.Infof("TxEngine transition: %v", state) - switch te.state { - case AcceptingReadOnly, AcceptingReadAndWrite: + + // When we are transitioning from read write state, we should close all transactions. + if te.state == AcceptingReadAndWrite { te.shutdownLocked() - case NotServing: - // No special action. } te.state = state + if te.twopcEnabled && te.state == AcceptingReadAndWrite { + // If the prepared pool is not open, then we need to redo the prepared transactions + // before we open the transaction engine to accept new writes. + // This check is required because during a Promotion, we would have already setup the prepared pool + // and redid the prepared transactions when we turn super_read_only off. So we don't need to do it again. + if !te.preparedPool.IsOpen() { + // We need to redo prepared transactions here to handle vttablet restarts. + // If MySQL continues to work fine, then we won't end up redoing the prepared transactions as part of any RPC call + // since VTOrc won't call `UndoDemotePrimary`. We need to do them as part of this transition. + te.redoPreparedTransactionsLocked() + } + te.startTransactionWatcher() + } te.txPool.Open(te.env.Config().DB.AppWithDB(), te.env.Config().DB.DbaWithDB(), te.env.Config().DB.AppDebugWithDB()) +} - if te.twopcEnabled && te.state == AcceptingReadAndWrite { - // Set the preparedPool to start accepting connections. - te.preparedPool.shutdown = false - // If there are errors, we choose to raise an alert and - // continue anyway. Serving traffic is considered more important - // than blocking everything for the sake of a few transactions. - // We do this async; so we do not end up blocking writes on - // failover for our setup tasks if using semi-sync replication. - te.twoPCReady.Add(1) - go func() { - defer te.twoPCReady.Done() - if err := te.twoPC.Open(te.env.Config().DB); err != nil { - te.env.Stats().InternalErrors.Add("TwopcOpen", 1) - log.Errorf("Could not open TwoPC engine: %v", err) - } - if err := te.prepareFromRedo(); err != nil { - te.env.Stats().InternalErrors.Add("TwopcResurrection", 1) - log.Errorf("Could not prepare transactions: %v", err) - } - te.startTransactionWatcher() - }() +// RedoPreparedTransactions acquires the state lock and calls redoPreparedTransactionsLocked. +func (te *TxEngine) RedoPreparedTransactions() { + if te.twopcEnabled { + te.stateLock.Lock() + defer te.stateLock.Unlock() + te.redoPreparedTransactionsLocked() + } +} + +// redoPreparedTransactionsLocked redoes the prepared transactions. +// If there are errors, we choose to raise an alert and +// continue anyway. Serving traffic is considered more important +// than blocking everything for the sake of a few transactions. +// We do this async; so we do not end up blocking writes on +// failover for our setup tasks if using semi-sync replication. +func (te *TxEngine) redoPreparedTransactionsLocked() { + oldState := te.state + // We shutdown to ensure no other writes are in progress. + te.shutdownLocked() + defer func() { + te.state = oldState + }() + + if err := te.twoPC.Open(te.env.Config().DB); err != nil { + te.env.Stats().InternalErrors.Add("TwopcOpen", 1) + log.Errorf("Could not open TwoPC engine: %v", err) + return + } + + // We should only open the prepared pool and the transaction pool if the opening of twoPC pool is successful. + // We use the prepared pool being open to know if we need to redo the prepared transactions. + // So if we open the prepared pool and then opening of twoPC fails, we will never end up opening the twoPC pool at all! + // This is why opening prepared pool after the twoPC pool is crucial for correctness. + te.preparedPool.Open() + // We have to defer opening the transaction pool because we call shutdown in the beginning that closes it. + // We want to open the transaction pool after the prepareFromRedo has run. Also, we want this to run even if that fails. + defer te.txPool.Open(te.env.Config().DB.AppWithDB(), te.env.Config().DB.DbaWithDB(), te.env.Config().DB.AppDebugWithDB()) + + if err := te.prepareFromRedo(); err != nil { + te.env.Stats().InternalErrors.Add("TwopcResurrection", 1) + log.Errorf("Could not prepare transactions: %v", err) } } @@ -306,11 +335,6 @@ func (te *TxEngine) shutdownLocked() { te.stateLock.Lock() log.Infof("TxEngine - state lock acquired again") - // Shut down functions are idempotent. - // No need to check if 2pc is enabled. - log.Infof("TxEngine - stop watchdog") - te.stopTransactionWatcher() - poolEmpty := make(chan bool) rollbackDone := make(chan bool) // This goroutine decides if transactions have to be @@ -333,13 +357,6 @@ func (te *TxEngine) shutdownLocked() { // connections. te.txPool.scp.ShutdownNonTx() if te.shutdownGracePeriod <= 0 { - // No grace period was specified. Wait indefinitely for transactions to be concluded. - // TODO(sougou): invoking rollbackPrepared is incorrect here. Prepared statements should - // actually be rolled back last. But this will cause the shutdown to hang because the - // tx pool will never become empty, because the prepared pool is holding on to connections - // from the tx pool. But we plan to deprecate this approach to 2PC. So, this - // should eventually be deleted. - te.rollbackPrepared() log.Info("No grace period specified: performing normal wait.") return } @@ -354,6 +371,9 @@ func (te *TxEngine) shutdownLocked() { log.Info("Transactions completed before grace period: shutting down.") } }() + // It is important to note, that we aren't rolling back prepared transactions here. + // That is happneing in the same place where we are killing queries. This will block + // until either all prepared transactions get resolved or rollbacked. log.Infof("TxEngine - waiting for empty txPool") te.txPool.WaitForEmpty() // If the goroutine is still running, signal that it can exit. @@ -362,10 +382,19 @@ func (te *TxEngine) shutdownLocked() { log.Infof("TxEngine - making sure the goroutine has returned") <-rollbackDone + // We stop the transaction watcher so late, because if the user isn't running + // with any shutdown grace period, we still want the watcher to run while we are waiting + // for resolving transactions. + log.Infof("TxEngine - stop transaction watcher") + te.stopTransactionWatcher() + + // Mark the prepared pool closed. log.Infof("TxEngine - closing the txPool") te.txPool.Close() log.Infof("TxEngine - closing twoPC") te.twoPC.Close() + log.Infof("TxEngine - closing the prepared pool") + te.preparedPool.Close() log.Infof("TxEngine - finished shutdownLocked") } @@ -391,16 +420,17 @@ outer: if txid > maxid { maxid = txid } - conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. + conn, err := te.beginNewDbaConnection(ctx) if err != nil { - allErr.RecordError(err) + allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) continue } for _, stmt := range preparedTx.Queries { conn.TxProperties().RecordQuery(stmt) _, err := conn.Exec(ctx, stmt, 1, false) if err != nil { - allErr.RecordError(err) + allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) te.txPool.RollbackAndRelease(ctx, conn) continue outer } @@ -409,7 +439,7 @@ outer: // we don't want to write again to the redo log. err = te.preparedPool.Put(conn, preparedTx.Dtid) if err != nil { - allErr.RecordError(err) + allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) continue } } @@ -428,21 +458,21 @@ outer: return allErr.Error() } -// shutdownTransactions rolls back all open transactions -// including the prepared ones. -// This is used for transitioning from a primary to a non-primary -// serving type. +// shutdownTransactions rolls back all open transactions that are idol. +// These are transactions that are open but no write is executing on them right now. +// By definition, prepared transactions aren't part of them since these are transactions on which +// the user has issued a commit command. These transactions are rollbacked elsewhere when we kill all writes. +// This is used for transitioning from a primary to a non-primary serving type. func (te *TxEngine) shutdownTransactions() { - te.rollbackPrepared() ctx := tabletenv.LocalContext() - // The order of rollbacks is currently not material because - // we don't allow new statements or commits during - // this function. In case of any such change, this will - // have to be revisited. te.txPool.Shutdown(ctx) } -func (te *TxEngine) rollbackPrepared() { +// RollbackPrepared rollbacks all the prepared transactions. +// This should only be called after we are certain no other writes are in progress. +// If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through +// and cause data corruption since we won't be able to prepare the transaction again. +func (te *TxEngine) RollbackPrepared() { ctx := tabletenv.LocalContext() for _, conn := range te.preparedPool.FetchAllForRollback() { te.txPool.Rollback(ctx, conn) @@ -581,3 +611,22 @@ func (te *TxEngine) Release(connID int64) error { return nil } + +// beginNewDbaConnection gets a new dba connection and starts a transaction in it. +// This should only be used to redo prepared transactions. All the other writes should use the normal pool. +func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnection, error) { + dbConn, err := connpool.NewConn(ctx, te.env.Config().DB.DbaWithDB(), nil, nil, te.env) + if err != nil { + return nil, err + } + + sc := &StatefulConnection{ + dbConn: &connpool.PooledConn{ + Conn: dbConn, + }, + env: te.env, + } + + _, _, err = te.txPool.begin(ctx, nil, false, sc, nil) + return sc, err +} diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index d5376172856..c801e208e33 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -36,8 +36,8 @@ type TxPreparedPool struct { mu sync.Mutex conns map[string]*StatefulConnection reserved map[string]error - // shutdown tells if the prepared pool has been drained and shutdown. - shutdown bool + // open tells if the prepared pool is open for accepting transactions. + open bool capacity int } @@ -60,7 +60,7 @@ func (pp *TxPreparedPool) Put(c *StatefulConnection, dtid string) error { pp.mu.Lock() defer pp.mu.Unlock() // If the pool is shutdown, we don't accept new prepared transactions. - if pp.shutdown { + if !pp.open { return vterrors.VT09025("pool is shutdown") } if _, ok := pp.reserved[dtid]; ok { @@ -93,6 +93,27 @@ func (pp *TxPreparedPool) FetchForRollback(dtid string) *StatefulConnection { return c } +// Open marks the prepared pool open for use. +func (pp *TxPreparedPool) Open() { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = true +} + +// Close marks the prepared pool closed. +func (pp *TxPreparedPool) Close() { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = false +} + +// IsOpen checks if the prepared pool is open for use. +func (pp *TxPreparedPool) IsOpen() bool { + pp.mu.Lock() + defer pp.mu.Unlock() + return pp.open +} + // FetchForCommit returns the connection for commit. Before returning, // it remembers the dtid in its reserved list as "committing". If // the dtid is already in the reserved list, it returns an error. @@ -105,7 +126,7 @@ func (pp *TxPreparedPool) FetchForCommit(dtid string) (*StatefulConnection, erro defer pp.mu.Unlock() // If the pool is shutdown, we don't have any connections to return. // That however doesn't mean this transaction was committed, it could very well have been rollbacked. - if pp.shutdown { + if !pp.open { return nil, vterrors.VT09025("pool is shutdown") } if err, ok := pp.reserved[dtid]; ok { @@ -139,7 +160,7 @@ func (pp *TxPreparedPool) Forget(dtid string) { func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection { pp.mu.Lock() defer pp.mu.Unlock() - pp.shutdown = true + pp.open = false conns := make([]*StatefulConnection, 0, len(pp.conns)) for _, c := range pp.conns { conns = append(conns, c) diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go index 42e2b800e0e..43c0c022b13 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go @@ -24,13 +24,13 @@ import ( ) func TestEmptyPrep(t *testing.T) { - pp := NewTxPreparedPool(0) + pp := createAndOpenPreparedPool(0) err := pp.Put(nil, "aa") require.ErrorContains(t, err, "prepared transactions exceeded limit: 0") } func TestPrepPut(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) err := pp.Put(nil, "aa") require.NoError(t, err) err = pp.Put(nil, "bb") @@ -50,7 +50,7 @@ func TestPrepPut(t *testing.T) { } func TestPrepFetchForRollback(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) conn := &StatefulConnection{} pp.Put(conn, "aa") got := pp.FetchForRollback("bb") @@ -68,7 +68,7 @@ func TestPrepFetchForRollback(t *testing.T) { } func TestPrepFetchForCommit(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) conn := &StatefulConnection{} got, err := pp.FetchForCommit("aa") require.NoError(t, err) @@ -97,7 +97,7 @@ func TestPrepFetchForCommit(t *testing.T) { } func TestPrepFetchAll(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) conn1 := &StatefulConnection{} conn2 := &StatefulConnection{} pp.Put(conn1, "aa") @@ -108,3 +108,11 @@ func TestPrepFetchAll(t *testing.T) { _, err := pp.FetchForCommit("aa") require.ErrorContains(t, err, "pool is shutdown") } + +// createAndOpenPreparedPool creates a new transaction prepared pool and opens it. +// Used as a helper function for testing. +func createAndOpenPreparedPool(capacity int) *TxPreparedPool { + pp := NewTxPreparedPool(capacity) + pp.Open() + return pp +} diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 7c7055b3e15..52bb71abcd9 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -226,6 +226,9 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott return nil } +// RedoPreparedTransactions is part of the tabletserver.Controller interface +func (tqsc *Controller) RedoPreparedTransactions() {} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock() diff --git a/test/config.json b/test/config.json index 49f77e1b7fb..f1a8f1bcf74 100644 --- a/test/config.json +++ b/test/config.json @@ -842,9 +842,9 @@ "RetryMax": 1, "Tags": [] }, - "vtgate_transaction_twopc_fuzzer": { + "vtgate_transaction_twopc_stress": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/fuzzer"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/stress"], "Command": [], "Manual": false, "Shard": "vtgate_transaction", From 54589415ae483d4814b6c915176d94e7ce12871b Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Wed, 21 Aug 2024 13:36:42 +0200 Subject: [PATCH 3/6] VReplication workflow package: unit tests for StreamMigrator, Mount et al (#16498) Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 33 ++- go/vt/vtctl/workflow/materializer_env_test.go | 4 +- go/vt/vtctl/workflow/materializer_test.go | 14 +- go/vt/vtctl/workflow/mount_test.go | 77 +++++ go/vt/vtctl/workflow/resharder_test.go | 4 +- go/vt/vtctl/workflow/stream_migrator_test.go | 275 +++++++++++++++++- go/vt/vtctl/workflow/utils_test.go | 73 +++++ .../workflow/vreplication_stream_test.go | 52 ++++ 8 files changed, 523 insertions(+), 9 deletions(-) create mode 100644 go/vt/vtctl/workflow/mount_test.go create mode 100644 go/vt/vtctl/workflow/vreplication_stream_test.go diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 1d25aafa75f..b5d0a308261 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -256,6 +256,9 @@ type testTMClient struct { readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string + // Stack of ReadVReplicationWorkflowsResponse to return, in order, for each shard + readVReplicationWorkflowsResponses map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse + env *testEnv // For access to the env config from tmc methods. reverse atomic.Bool // Are we reversing traffic? frozen atomic.Bool // Are the workflows frozen? @@ -267,6 +270,7 @@ func newTestTMClient(env *testEnv) *testTMClient { vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), env: env, } @@ -285,6 +289,10 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil } +func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string { + return fmt.Sprintf("%s/%s", keyspace, shard) +} + func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -463,6 +471,10 @@ func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet * tmc.mu.Lock() defer tmc.mu.Unlock() + workflowKey := tmc.GetWorkflowKey(tablet.Keyspace, tablet.Shard) + if resp := tmc.getVReplicationWorkflowsResponse(workflowKey); resp != nil { + return resp, nil + } workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables if len(req.IncludeWorkflows) > 0 { for _, wf := range req.IncludeWorkflows { @@ -494,7 +506,7 @@ func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet * }, }, }, - Pos: "MySQL56/" + position, + Pos: position, TimeUpdated: protoutil.TimeToProto(time.Now()), TimeHeartbeat: protoutil.TimeToProto(time.Now()), }, @@ -541,6 +553,25 @@ func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *top return nil } +func (tmc *testTMClient) AddVReplicationWorkflowsResponse(key string, resp *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + tmc.readVReplicationWorkflowsResponses[key] = append(tmc.readVReplicationWorkflowsResponses[key], resp) +} + +func (tmc *testTMClient) getVReplicationWorkflowsResponse(key string) *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse { + if len(tmc.readVReplicationWorkflowsResponses) == 0 { + return nil + } + responses, ok := tmc.readVReplicationWorkflowsResponses[key] + if !ok || len(responses) == 0 { + return nil + } + resp := tmc.readVReplicationWorkflowsResponses[key][0] + tmc.readVReplicationWorkflowsResponses[key] = tmc.readVReplicationWorkflowsResponses[key][1:] + return resp +} + // // Utility / helper functions. // diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 569651f85ca..aada59c244d 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -61,7 +61,7 @@ type testMaterializerEnv struct { venv *vtenv.Environment } -//---------------------------------------------- +// ---------------------------------------------- // testMaterializerEnv func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sourceShards, targetShards []string) *testMaterializerEnv { @@ -426,7 +426,7 @@ func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Conte }, }, }, - Pos: "MySQL56/" + position, + Pos: position, TimeUpdated: protoutil.TimeToProto(time.Now()), TimeHeartbeat: protoutil.TimeToProto(time.Now()), } diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 51a7d22d5eb..763dd7c04d3 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -44,7 +44,7 @@ import ( ) const ( - position = "9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" + position = "MySQL56/9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1" mzCheckJournal = "/select val from _vt.resharding_journal where id=" mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1" @@ -56,6 +56,14 @@ var ( defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String() ) +func gtid(position string) string { + arr := strings.Split(position, "/") + if len(arr) != 2 { + return "" + } + return arr[1] +} + func TestStripForeignKeys(t *testing.T) { tcs := []struct { desc string @@ -577,7 +585,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { sourceShard, err := env.topoServ.GetShardNames(ctx, ms.SourceKeyspace) require.NoError(t, err) want := fmt.Sprintf("shard_streams:{key:\"%s/%s\" value:{streams:{id:1 tablet:{cell:\"%s\" uid:200} source_shard:\"%s/%s\" position:\"%s\" status:\"Running\" info:\"VStream Lag: 0s\"}}} traffic_state:\"Reads Not Switched. Writes Not Switched\"", - ms.TargetKeyspace, targetShard[0], env.cell, ms.SourceKeyspace, sourceShard[0], position) + ms.TargetKeyspace, targetShard[0], env.cell, ms.SourceKeyspace, sourceShard[0], gtid(position)) res, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ Workflow: ms.Workflow, @@ -636,7 +644,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) { Uid: 200, }, SourceShard: fmt.Sprintf("%s/%s", ms.SourceKeyspace, sourceShard[0]), - Position: position, + Position: gtid(position), Status: binlogdatapb.VReplicationWorkflowState_Running.String(), Info: "VStream Lag: 0s", }, diff --git a/go/vt/vtctl/workflow/mount_test.go b/go/vt/vtctl/workflow/mount_test.go new file mode 100644 index 00000000000..2fec275e4cb --- /dev/null +++ b/go/vt/vtctl/workflow/mount_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtenv" +) + +// TestMount tests various Mount-related methods. +func TestMount(t *testing.T) { + const ( + extCluster = "extcluster" + topoType = "etcd2" + topoServer = "localhost:2379" + topoRoot = "/vitess/global" + ) + ctx := context.Background() + ts := memorytopo.NewServer(ctx, "cell") + tmc := &fakeTMC{} + s := NewServer(vtenv.NewTestEnv(), ts, tmc) + + resp, err := s.MountRegister(ctx, &vtctldatapb.MountRegisterRequest{ + Name: extCluster, + TopoType: topoType, + TopoServer: topoServer, + TopoRoot: topoRoot, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + respList, err := s.MountList(ctx, &vtctldatapb.MountListRequest{}) + require.NoError(t, err) + require.NotNil(t, respList) + require.Equal(t, []string{extCluster}, respList.Names) + + respShow, err := s.MountShow(ctx, &vtctldatapb.MountShowRequest{ + Name: extCluster, + }) + require.NoError(t, err) + require.NotNil(t, respShow) + require.Equal(t, extCluster, respShow.Name) + require.Equal(t, topoType, respShow.TopoType) + require.Equal(t, topoServer, respShow.TopoServer) + require.Equal(t, topoRoot, respShow.TopoRoot) + + respUnregister, err := s.MountUnregister(ctx, &vtctldatapb.MountUnregisterRequest{ + Name: extCluster, + }) + require.NoError(t, err) + require.NotNil(t, respUnregister) + + respList, err = s.MountList(ctx, &vtctldatapb.MountListRequest{}) + require.NoError(t, err) + require.NotNil(t, respList) + require.Nil(t, respList.Names) +} diff --git a/go/vt/vtctl/workflow/resharder_test.go b/go/vt/vtctl/workflow/resharder_test.go index 1bb2f065e0f..6353f36db9f 100644 --- a/go/vt/vtctl/workflow/resharder_test.go +++ b/go/vt/vtctl/workflow/resharder_test.go @@ -84,7 +84,7 @@ func TestReshardCreate(t *testing.T) { { Id: 1, Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, - SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + SourceShard: "targetks/0", Position: gtid(position), Status: "Running", Info: "VStream Lag: 0s", }, }, }, @@ -93,7 +93,7 @@ func TestReshardCreate(t *testing.T) { { Id: 1, Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, - SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + SourceShard: "targetks/0", Position: gtid(position), Status: "Running", Info: "VStream Lag: 0s", }, }, }, diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go index 38ae10280f7..5e9c2a79038 100644 --- a/go/vt/vtctl/workflow/stream_migrator_test.go +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -19,17 +19,22 @@ package workflow import ( "context" "encoding/json" + "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/sqlparser" - + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) @@ -347,3 +352,271 @@ func stringifyVRS(streams []*VReplicationStream) string { b, _ := json.Marshal(converted) return string(b) } + +var testVSchema = &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "xxhash", + }}, + }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "xxhash", + }}, + }, + "ref": { + Type: vindexes.TypeReference, + }, + }, +} + +var ( + commerceKeyspace = &testKeyspace{ + KeyspaceName: "commerce", + ShardNames: []string{"0"}, + } + customerUnshardedKeyspace = &testKeyspace{ + KeyspaceName: "customer", + ShardNames: []string{"0"}, + } + customerShardedKeyspace = &testKeyspace{ + KeyspaceName: "customer", + ShardNames: []string{"-80", "80-"}, + } +) + +type streamMigratorEnv struct { + tenv *testEnv + ts *testTrafficSwitcher + sourceTabletIds []int + targetTabletIds []int +} + +func (env *streamMigratorEnv) close() { + env.tenv.close() +} + +func (env *streamMigratorEnv) addSourceQueries(queries []string) { + for _, id := range env.sourceTabletIds { + for _, q := range queries { + env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + } + } +} + +func (env *streamMigratorEnv) addTargetQueries(queries []string) { + for _, id := range env.targetTabletIds { + for _, q := range queries { + env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + } + } +} + +func newStreamMigratorEnv(ctx context.Context, t *testing.T, sourceKeyspace, targetKeyspace *testKeyspace) *streamMigratorEnv { + tenv := newTestEnv(t, ctx, "cell1", sourceKeyspace, targetKeyspace) + env := &streamMigratorEnv{tenv: tenv} + + ksschema, err := vindexes.BuildKeyspaceSchema(testVSchema, "ks", sqlparser.NewTestParser()) + require.NoError(t, err, "could not create test keyspace %+v", testVSchema) + sources := make(map[string]*MigrationSource, len(sourceKeyspace.ShardNames)) + targets := make(map[string]*MigrationTarget, len(targetKeyspace.ShardNames)) + for i, shard := range sourceKeyspace.ShardNames { + tablet := tenv.tablets[sourceKeyspace.KeyspaceName][startingSourceTabletUID+(i*tabletUIDStep)] + kr, _ := key.ParseShardingSpec(shard) + sources[shard] = &MigrationSource{ + si: topo.NewShardInfo(sourceKeyspace.KeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), + primary: &topo.TabletInfo{ + Tablet: tablet, + }, + } + env.sourceTabletIds = append(env.sourceTabletIds, int(tablet.Alias.Uid)) + } + for i, shard := range targetKeyspace.ShardNames { + tablet := tenv.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+(i*tabletUIDStep)] + kr, _ := key.ParseShardingSpec(shard) + targets[shard] = &MigrationTarget{ + si: topo.NewShardInfo(targetKeyspace.KeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), + primary: &topo.TabletInfo{ + Tablet: tablet, + }, + } + env.targetTabletIds = append(env.targetTabletIds, int(tablet.Alias.Uid)) + } + ts := &testTrafficSwitcher{ + trafficSwitcher: trafficSwitcher{ + migrationType: binlogdatapb.MigrationType_SHARDS, + workflow: "wf1", + id: 1, + sources: sources, + targets: targets, + sourceKeyspace: sourceKeyspace.KeyspaceName, + targetKeyspace: targetKeyspace.KeyspaceName, + sourceKSSchema: ksschema, + workflowType: binlogdatapb.VReplicationWorkflowType_Reshard, + ws: tenv.ws, + }, + sourceKeyspaceSchema: ksschema, + } + env.ts = ts + + return env +} + +func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sourceShard string) { + var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse + wfName := "wfMat1" + wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: wfName, + WorkflowType: binlogdatapb.VReplicationWorkflowType_Materialize, + }) + wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: env.tenv.sourceKeyspace.KeyspaceName, + Shard: sourceShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "t1", Filter: "select * from t1"}, + }, + }, + }, + Pos: position, + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + workflowKey := env.tenv.tmc.GetWorkflowKey(env.tenv.sourceKeyspace.KeyspaceName, sourceShard) + workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls + } + for _, resp := range workflowResponses { + env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + } + queries := []string{ + fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id), + fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in (%d)", id), + fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, wfName), + } + env.addSourceQueries(queries) + queries = []string{ + fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, wfName), + } + env.addTargetQueries(queries) + +} + +func addReferenceWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sourceShard string) { + var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse + wfName := "wfRef1" + wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: wfName, + WorkflowType: binlogdatapb.VReplicationWorkflowType_Materialize, + }) + wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: env.tenv.sourceKeyspace.KeyspaceName, + Shard: sourceShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "ref", Filter: "select * from ref"}, + }, + }, + }, + Pos: position, + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + workflowKey := env.tenv.tmc.GetWorkflowKey(env.tenv.sourceKeyspace.KeyspaceName, sourceShard) + workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls + } + for _, resp := range workflowResponses { + env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + } +} + +func TestBuildStreamMigratorOneMaterialize(t *testing.T) { + ctx := context.Background() + env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) + defer env.close() + tmc := env.tenv.tmc + + addMaterializeWorkflow(t, env, 100, "0") + + // FIXME: Note: currently it is not optimal: we create two streams for each shard from all the + // shards even if the key ranges don't intersect. TBD + getInsert := func(shard string) string { + s := "/insert into _vt.vreplication.*" + s += fmt.Sprintf("shard:\"-80\".*in_keyrange.*c1.*%s.*", shard) + s += fmt.Sprintf("shard:\"80-\".*in_keyrange.*c1.*%s.*", shard) + return s + } + tmc.expectVRQuery(200, getInsert("-80"), &sqltypes.Result{}) + tmc.expectVRQuery(210, getInsert("80-"), &sqltypes.Result{}) + + sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + require.NotNil(t, sm) + require.NotNil(t, sm.streams) + require.Equal(t, 1, len(sm.streams)) + + workflows, err := sm.StopStreams(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(workflows)) + require.NoError(t, sm.MigrateStreams(ctx)) + require.Len(t, sm.templates, 1) + env.addTargetQueries([]string{ + fmt.Sprintf("update _vt.vreplication set state='Running' where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, "wfMat1"), + }) + require.NoError(t, StreamMigratorFinalize(ctx, env.ts, []string{"wfMat1"})) +} + +func TestBuildStreamMigratorNoStreams(t *testing.T) { + ctx := context.Background() + env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) + defer env.close() + + sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + require.NotNil(t, sm) + require.NotNil(t, sm.streams) + require.Equal(t, 0, len(sm.streams)) + + workflows, err := sm.StopStreams(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(workflows)) + require.NoError(t, sm.MigrateStreams(ctx)) + require.Len(t, sm.templates, 0) +} + +func TestBuildStreamMigratorRefStream(t *testing.T) { + ctx := context.Background() + env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) + defer env.close() + + addReferenceWorkflow(t, env, 100, "0") + + sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + require.NotNil(t, sm) + require.NotNil(t, sm.streams) + require.Equal(t, 0, len(sm.streams)) + + workflows, err := sm.StopStreams(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(workflows)) + require.NoError(t, sm.MigrateStreams(ctx)) + require.Len(t, sm.templates, 0) +} diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index d79c4710b77..b315e1aa991 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -16,12 +16,85 @@ import ( "vitess.io/vitess/go/testfiles" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/etcd2topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" ) +// TestCreateDefaultShardRoutingRules confirms that the default shard routing rules are created correctly for sharded +// and unsharded keyspaces. +func TestCreateDefaultShardRoutingRules(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ks1 := &testKeyspace{ + KeyspaceName: "sourceks", + } + ks2 := &testKeyspace{ + KeyspaceName: "targetks", + } + + type testCase struct { + name string + sourceKeyspace *testKeyspace + targetKeyspace *testKeyspace + shards []string + want map[string]string + } + getExpectedRules := func(sourceKeyspace, targetKeyspace *testKeyspace) map[string]string { + rules := make(map[string]string) + for _, targetShard := range targetKeyspace.ShardNames { + rules[fmt.Sprintf("%s.%s", targetKeyspace.KeyspaceName, targetShard)] = sourceKeyspace.KeyspaceName + } + return rules + + } + testCases := []testCase{ + { + name: "unsharded", + sourceKeyspace: ks1, + targetKeyspace: ks2, + shards: []string{"0"}, + }, + { + name: "sharded", + sourceKeyspace: ks2, + targetKeyspace: ks1, + shards: []string{"-80", "80-"}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.sourceKeyspace.ShardNames = tc.shards + tc.targetKeyspace.ShardNames = tc.shards + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + ms := &vtctldata.MaterializeSettings{ + Workflow: "wf1", + SourceKeyspace: tc.sourceKeyspace.KeyspaceName, + TargetKeyspace: tc.targetKeyspace.KeyspaceName, + TableSettings: []*vtctldata.TableMaterializeSettings{ + { + TargetTable: "t1", + SourceExpression: "select * from t1", + }, + }, + Cell: "zone1", + SourceShards: tc.sourceKeyspace.ShardNames, + } + err := createDefaultShardRoutingRules(ctx, ms, env.ts) + require.NoError(t, err) + rules, err := topotools.GetShardRoutingRules(ctx, env.ts) + require.NoError(t, err) + require.Len(t, rules, len(tc.shards)) + want := getExpectedRules(tc.sourceKeyspace, tc.targetKeyspace) + require.EqualValues(t, want, rules) + }) + } +} + // TestUpdateKeyspaceRoutingRule confirms that the keyspace routing rules are updated correctly. func TestUpdateKeyspaceRoutingRule(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/go/vt/vtctl/workflow/vreplication_stream_test.go b/go/vt/vtctl/workflow/vreplication_stream_test.go new file mode 100644 index 00000000000..6269cfa978e --- /dev/null +++ b/go/vt/vtctl/workflow/vreplication_stream_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "fmt" + "reflect" + "testing" +) + +// TestVReplicationStreams tests various methods of VReplicationStreams. +func TestVReplicationStreams(t *testing.T) { + var streams VReplicationStreams + for i := 1; i <= 3; i++ { + streams = append(streams, &VReplicationStream{ID: int32(i), Workflow: fmt.Sprintf("workflow%d", i)}) + } + + tests := []struct { + name string + funcUnderTest func(VReplicationStreams) interface{} + expectedResult interface{} + }{ + {"Test IDs", func(s VReplicationStreams) interface{} { return s.IDs() }, []int32{1, 2, 3}}, + {"Test Values", func(s VReplicationStreams) interface{} { return s.Values() }, "(1, 2, 3)"}, + {"Test Workflows", func(s VReplicationStreams) interface{} { return s.Workflows() }, []string{"workflow1", "workflow2", "workflow3"}}, + {"Test Copy", func(s VReplicationStreams) interface{} { return s.Copy() }, streams.Copy()}, + {"Test ToSlice", func(s VReplicationStreams) interface{} { return s.ToSlice() }, []*VReplicationStream{streams[0], streams[1], streams[2]}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.funcUnderTest(streams) + if !reflect.DeepEqual(result, tt.expectedResult) { + t.Errorf("Failed %s: expected %v, got %v", tt.name, tt.expectedResult, result) + } + }) + } +} From 4206c2a5dfa89694b6119166eb9a5af065bb4c17 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 21 Aug 2024 10:55:04 -0400 Subject: [PATCH 4/6] VReplication: Improve replication plan builder and event application errors (#16596) Signed-off-by: Matt Lord --- .../vreplication/replicator_plan.go | 2 +- .../tabletmanager/vreplication/vplayer.go | 37 ++++++++++++++++++- .../vreplication/vplayer_flaky_test.go | 27 ++++++++++++-- 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 91777f51b9c..bd41fd76419 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -89,7 +89,7 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent // select * construct was used. We need to use the field names. tplan, err := rp.buildFromFields(prelim.TargetName, prelim.Lastpk, fieldEvent.Fields) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to build replication plan for %s table", fieldEvent.TableName) } tplan.Fields = fieldEvent.Fields return tplan, nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 653cc713c8f..70bd8016b9d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -558,7 +558,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err := vp.applyEvent(ctx, event, mustSave); err != nil { if err != io.EOF { vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1) - var table, tableLogMsg string + var table, tableLogMsg, gtidLogMsg string switch { case event.GetFieldEvent() != nil: table = event.GetFieldEvent().TableName @@ -568,7 +568,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if table != "" { tableLogMsg = fmt.Sprintf(" for table %s", table) } - log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error()) + pos := getNextPosition(items, i, j+1) + if pos != "" { + gtidLogMsg = fmt.Sprintf(" while processing position %s", pos) + } + log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error()) + err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg) } return err } @@ -602,6 +607,34 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool { return false } +// getNextPosition returns the GTID set/position we would be at if the current +// transaction was committed. This is useful for error handling as we can then +// determine which GTID we're failing to process from the source and examine the +// binlog events for that GTID directly on the source to debug the issue. +// This is needed as it's not as simple as the user incrementing the current +// position in the stream by 1 as we may be skipping N intermediate GTIDs in the +// stream due to filtering. For GTIDs that we filter out we still replicate the +// GTID event itself, just without any internal events and a COMMIT event (see +// the unsavedEvent handling). +func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string { + for i < len(items) { + for j < len(items[i]) { + switch items[i][j].Type { + case binlogdatapb.VEventType_GTID: + pos, err := binlogplayer.DecodePosition(items[i][j].Gtid) + if err != nil { + return "" + } + return pos.String() + } + j++ + } + j = 0 + i++ + } + return "" +} + func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { stats := NewVrLogStats(event.Type.String()) switch event.Type { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index b1925c3c64f..0641a111199 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -575,9 +575,20 @@ func TestPlayerForeignKeyCheck(t *testing.T) { cancel() } -func TestPlayerStatementModeWithFilter(t *testing.T) { +// TestPlayerStatementModeWithFilterAndErrorHandling confirms that we get the +// expected error when using a filter with statement mode. It also tests the +// general vplayer applyEvent error and log message handling. +func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { defer deleteTablet(addTablet(100)) + // We want to check for the expected log message. + ole := log.Errorf + logger := logutil.NewMemoryLogger() + log.Errorf = logger.Errorf + defer func() { + log.Errorf = ole + }() + execStatements(t, []string{ "create table src1(id int, val varbinary(128), primary key(id))", }) @@ -600,21 +611,29 @@ func TestPlayerStatementModeWithFilter(t *testing.T) { cancel, _ := startVReplication(t, bls, "") defer cancel() + const gtid = "37f16b4c-5a74-11ef-87de-56bfd605e62e:100" input := []string{ "set @@session.binlog_format='STATEMENT'", + fmt.Sprintf("set @@session.gtid_next='%s'", gtid), "insert into src1 values(1, 'aaa')", + "set @@session.gtid_next='AUTOMATIC'", "set @@session.binlog_format='ROW'", } + expectedMsg := fmt.Sprintf("[Ee]rror applying event while processing position .*%s.* filter rules are not supported for SBR.*", gtid) + // It does not work when filter is enabled output := qh.Expect( "begin", "rollback", - "/update _vt.vreplication set message='filter rules are not supported for SBR", + fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) execStatements(t, input) expectDBClientQueries(t, output) + + logs := logger.String() + require.Regexp(t, expectedMsg, logs) } func TestPlayerStatementMode(t *testing.T) { @@ -1758,8 +1777,8 @@ func TestPlayerDDL(t *testing.T) { execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, qh.Expect( "alter table t1 add column val2 varchar(128)", - "/update _vt.vreplication set message='Duplicate", - "/update _vt.vreplication set state='Error', message='Duplicate", + "/update _vt.vreplication set message='error applying event: Duplicate", + "/update _vt.vreplication set state='Error', message='error applying event: Duplicate", )) cancel() From 1a0e7049ae7c7312af9b6829dc481e163abedf2f Mon Sep 17 00:00:00 2001 From: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Date: Wed, 21 Aug 2024 09:00:31 -0600 Subject: [PATCH 5/6] Fix go.mod files (#16625) Signed-off-by: Florent Poinsard --- go.mod | 2 +- vitess-mixin/go.mod | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 4547fd7e0c0..981cacb34c6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module vitess.io/vitess -go 1.23 +go 1.23.0 require ( cloud.google.com/go/storage v1.43.0 diff --git a/vitess-mixin/go.mod b/vitess-mixin/go.mod index a3a98d72d63..5659ed4be57 100644 --- a/vitess-mixin/go.mod +++ b/vitess-mixin/go.mod @@ -1,6 +1,6 @@ module vitess-mixin -go 1.23 +go 1.23.0 require ( github.com/evanphx/json-patch v5.9.0+incompatible @@ -130,4 +130,3 @@ require ( ) replace k8s.io/client-go v2.0.0-alpha.0.0.20181121191925-a47917edff34+incompatible => k8s.io/client-go v2.0.0-alpha.1+incompatible -e From 538dd4c4540089bbf9fa6e2f0ea2bb97352a6999 Mon Sep 17 00:00:00 2001 From: shanth96 Date: Wed, 21 Aug 2024 11:18:36 -0400 Subject: [PATCH 6/6] Fix query plan cache misses metric (#16562) Signed-off-by: shanth96 --- go/vt/vtgate/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 5dfbe4faba9..093246c71c5 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -208,7 +208,7 @@ func NewExecutor( return e.plans.Metrics.Hits() }) stats.NewCounterFunc("QueryPlanCacheMisses", "Query plan cache misses", func() int64 { - return e.plans.Metrics.Hits() + return e.plans.Metrics.Misses() }) servenv.HTTPHandle(pathQueryPlans, e) servenv.HTTPHandle(pathScatterStats, e)