diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 5f5d5efbe16..9a46562d1c7 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -111,6 +111,7 @@ func start(t *testing.T) (*mysql.Conn, func()) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) + cleanup(t) return conn, func() { conn.Close() @@ -121,6 +122,7 @@ func start(t *testing.T) (*mysql.Conn, func()) { func cleanup(t *testing.T) { cluster.PanicHandler(t) utils.ClearOutTable(t, vtParams, "twopc_user") + utils.ClearOutTable(t, vtParams, "twopc_t1") } type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index 2336c553502..7c289a03c2a 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -1,12 +1,21 @@ -create table twopc_user ( - id bigint, +create table twopc_user +( + id bigint, name varchar(64), primary key (id) ) Engine=InnoDB; -create table twopc_music ( - id varchar(64), +create table twopc_music +( + id varchar(64), user_id bigint, - title varchar(64), + title varchar(64), primary key (id) ) Engine=InnoDB; + +create table twopc_t1 +( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 46eb8e5af7c..b6e978e7aac 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -28,88 +28,17 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/syscallutil" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" ) -const ( - DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" - DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" -) - -// TestReadingUnresolvedTransactions tests the reading of unresolved transactions -func TestReadingUnresolvedTransactions(t *testing.T) { - testcases := []struct { - name string - queries []string - }{ - { - name: "show transaction status for explicit keyspace", - queries: []string{ - fmt.Sprintf("show unresolved transactions for %v", keyspaceName), - }, - }, - { - name: "show transaction status with use command", - queries: []string{ - fmt.Sprintf("use %v", keyspaceName), - "show unresolved transactions", - }, - }, - } - for _, testcase := range testcases { - t.Run(testcase.name, func(t *testing.T) { - conn, closer := start(t) - defer closer() - // Start an atomic transaction. - utils.Exec(t, conn, "begin") - // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits - // it is very easy to figure out what value will end up in which shard. - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") - // We want to delay the commit on one of the shards to simulate slow commits on a shard. - writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") - defer deleteFile(DebugDelayCommitShard) - writeTestCommunicationFile(t, DebugDelayCommitTime, "5") - defer deleteFile(DebugDelayCommitTime) - // We will execute a commit in a go routine, because we know it will take some time to complete. - // While the commit is ongoing, we would like to check that we see the unresolved transaction. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _, err := utils.ExecAllowError(t, conn, "commit") - if err != nil { - log.Errorf("Error in commit - %v", err) - } - }() - // Allow enough time for the commit to have started. - time.Sleep(1 * time.Second) - var lastRes *sqltypes.Result - newConn, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - defer newConn.Close() - for _, query := range testcase.queries { - lastRes = utils.Exec(t, newConn, query) - } - require.NotNil(t, lastRes) - require.Len(t, lastRes.Rows, 1) - // This verifies that we already decided to commit the transaction, but it is still unresolved. - assert.Contains(t, fmt.Sprintf("%v", lastRes.Rows), `VARCHAR("COMMIT")`) - // Wait for the commit to have returned. - wg.Wait() - }) - } -} - // TestDisruptions tests that atomic transactions persevere through various disruptions. func TestDisruptions(t *testing.T) { testcases := []struct { @@ -161,10 +90,10 @@ func TestDisruptions(t *testing.T) { utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val)) } // We want to delay the commit on one of the shards to simulate slow commits on a shard. - writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") - defer deleteFile(DebugDelayCommitShard) - writeTestCommunicationFile(t, DebugDelayCommitTime, tt.commitDelayTime) - defer deleteFile(DebugDelayCommitTime) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime) + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) // We will execute a commit in a go routine, because we know it will take some time to complete. // While the commit is ongoing, we would like to run the disruption. var wg sync.WaitGroup @@ -229,18 +158,6 @@ func reparentToFistTablet(t *testing.T) { } } -// writeTestCommunicationFile writes the content to the file with the given name. -// We use these files to coordinate with the vttablets running in the debug mode. -func writeTestCommunicationFile(t *testing.T, fileName string, content string) { - err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) - require.NoError(t, err) -} - -// deleteFile deletes the file specified. -func deleteFile(fileName string) { - _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) -} - // waitForResults waits for the results of the query to be as expected. func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { timeout := time.After(waitTime) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 98bc158c4da..ce104fa94ec 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -955,3 +956,69 @@ func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSessio assert.Equal(t, txParticipants, tx.participants) } } + +// TestReadingUnresolvedTransactions tests the reading of unresolved transactions +func TestReadingUnresolvedTransactions(t *testing.T) { + testcases := []struct { + name string + queries []string + }{ + { + name: "show transaction status for explicit keyspace", + queries: []string{ + fmt.Sprintf("show unresolved transactions for %v", keyspaceName), + }, + }, + { + name: "show transaction status with use command", + queries: []string{ + fmt.Sprintf("use %v", keyspaceName), + "show unresolved transactions", + }, + }, + } + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + conn, closer := start(t) + defer closer() + // Start an atomic transaction. + utils.Exec(t, conn, "begin") + // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits + // it is very easy to figure out what value will end up in which shard. + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + // We want to delay the commit on one of the shards to simulate slow commits on a shard. + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, "5") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) + // We will execute a commit in a go routine, because we know it will take some time to complete. + // While the commit is ongoing, we would like to check that we see the unresolved transaction. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + fmt.Println("Error in commit: ", err.Error()) + } + }() + // Allow enough time for the commit to have started. + time.Sleep(1 * time.Second) + var lastRes *sqltypes.Result + newConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer newConn.Close() + for _, query := range testcase.queries { + lastRes = utils.Exec(t, newConn, query) + } + require.NotNil(t, lastRes) + require.Len(t, lastRes.Rows, 1) + // This verifies that we already decided to commit the transaction, but it is still unresolved. + assert.Contains(t, fmt.Sprintf("%v", lastRes.Rows), `VARCHAR("COMMIT")`) + // Wait for the commit to have returned. + wg.Wait() + }) + } +} diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 1f99a4cb89b..b3b8796accf 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -19,13 +19,19 @@ package utils import ( "context" "fmt" + "os" + "path" "testing" "time" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/log" +) + +const ( + DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" + DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" ) // ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, @@ -35,13 +41,13 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { for { conn, err := mysql.Connect(ctx, &vtParams) if err != nil { - log.Errorf("Error in connection - %v", err) + fmt.Printf("Error in connection - %v\n", err) continue } res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) if err != nil { - log.Errorf("Error in selecting - %v", err) + fmt.Printf("Error in selecting - %v\n", err) conn.Close() time.Sleep(100 * time.Millisecond) continue @@ -56,10 +62,22 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { } _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) if err != nil { - log.Errorf("Error in cleanup deletion - %v", err) + fmt.Printf("Error in cleanup deletion - %v\n", err) conn.Close() time.Sleep(100 * time.Millisecond) continue } } } + +// WriteTestCommunicationFile writes the content to the file with the given name. +// We use these files to coordinate with the vttablets running in the debug mode. +func WriteTestCommunicationFile(t *testing.T, fileName string, content string) { + err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) + require.NoError(t, err) +} + +// DeleteFile deletes the file specified. +func DeleteFile(fileName string) { + _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) +} diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index cdd17ebe1f0..bca58b05c1e 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -24,6 +24,14 @@ "name": "xxhash" } ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } \ No newline at end of file