Skip to content

Commit

Permalink
chore: move non-stress test to different package
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Aug 19, 2024
1 parent 5270eb4 commit 040da43
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 97 deletions.
2 changes: 2 additions & 0 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func start(t *testing.T) (*mysql.Conn, func()) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
cleanup(t)

return conn, func() {
conn.Close()
Expand All @@ -121,6 +122,7 @@ func start(t *testing.T) (*mysql.Conn, func()) {
func cleanup(t *testing.T) {
cluster.PanicHandler(t)
utils.ClearOutTable(t, vtParams, "twopc_user")
utils.ClearOutTable(t, vtParams, "twopc_t1")
}

type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value
Expand Down
19 changes: 14 additions & 5 deletions go/test/endtoend/transaction/twopc/schema.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
create table twopc_user (
id bigint,
create table twopc_user
(
id bigint,
name varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_music (
id varchar(64),
create table twopc_music
(
id varchar(64),
user_id bigint,
title varchar(64),
title varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_t1
(
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;
93 changes: 5 additions & 88 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,88 +28,17 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
)

const (
DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD"
DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME"
)

// TestReadingUnresolvedTransactions tests the reading of unresolved transactions
func TestReadingUnresolvedTransactions(t *testing.T) {
testcases := []struct {
name string
queries []string
}{
{
name: "show transaction status for explicit keyspace",
queries: []string{
fmt.Sprintf("show unresolved transactions for %v", keyspaceName),
},
},
{
name: "show transaction status with use command",
queries: []string{
fmt.Sprintf("use %v", keyspaceName),
"show unresolved transactions",
},
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
conn, closer := start(t)
defer closer()
// Start an atomic transaction.
utils.Exec(t, conn, "begin")
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
writeTestCommunicationFile(t, DebugDelayCommitShard, "80-")
defer deleteFile(DebugDelayCommitShard)
writeTestCommunicationFile(t, DebugDelayCommitTime, "5")
defer deleteFile(DebugDelayCommitTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to check that we see the unresolved transaction.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
var lastRes *sqltypes.Result
newConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer newConn.Close()
for _, query := range testcase.queries {
lastRes = utils.Exec(t, newConn, query)
}
require.NotNil(t, lastRes)
require.Len(t, lastRes.Rows, 1)
// This verifies that we already decided to commit the transaction, but it is still unresolved.
assert.Contains(t, fmt.Sprintf("%v", lastRes.Rows), `VARCHAR("COMMIT")`)
// Wait for the commit to have returned.
wg.Wait()
})
}
}

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
Expand Down Expand Up @@ -161,10 +90,10 @@ func TestDisruptions(t *testing.T) {
utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val))
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
writeTestCommunicationFile(t, DebugDelayCommitShard, "80-")
defer deleteFile(DebugDelayCommitShard)
writeTestCommunicationFile(t, DebugDelayCommitTime, tt.commitDelayTime)
defer deleteFile(DebugDelayCommitTime)
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
var wg sync.WaitGroup
Expand Down Expand Up @@ -229,18 +158,6 @@ func reparentToFistTablet(t *testing.T) {
}
}

// writeTestCommunicationFile writes the content to the file with the given name.
// We use these files to coordinate with the vttablets running in the debug mode.
func writeTestCommunicationFile(t *testing.T, fileName string, content string) {
err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644)
require.NoError(t, err)
}

// deleteFile deletes the file specified.
func deleteFile(fileName string) {
_ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName))
}

// waitForResults waits for the results of the query to be as expected.
func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) {
timeout := time.After(waitTime)
Expand Down
67 changes: 67 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/callerid"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -955,3 +956,69 @@ func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSessio
assert.Equal(t, txParticipants, tx.participants)
}
}

// TestReadingUnresolvedTransactions tests the reading of unresolved transactions
func TestReadingUnresolvedTransactions(t *testing.T) {
testcases := []struct {
name string
queries []string
}{
{
name: "show transaction status for explicit keyspace",
queries: []string{
fmt.Sprintf("show unresolved transactions for %v", keyspaceName),
},
},
{
name: "show transaction status with use command",
queries: []string{
fmt.Sprintf("use %v", keyspaceName),
"show unresolved transactions",
},
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
conn, closer := start(t)
defer closer()
// Start an atomic transaction.
utils.Exec(t, conn, "begin")
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, "5")
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to check that we see the unresolved transaction.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
fmt.Println("Error in commit: ", err.Error())
}
}()
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
var lastRes *sqltypes.Result
newConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer newConn.Close()
for _, query := range testcase.queries {
lastRes = utils.Exec(t, newConn, query)
}
require.NotNil(t, lastRes)
require.Len(t, lastRes.Rows, 1)
// This verifies that we already decided to commit the transaction, but it is still unresolved.
assert.Contains(t, fmt.Sprintf("%v", lastRes.Rows), `VARCHAR("COMMIT")`)
// Wait for the commit to have returned.
wg.Wait()
})
}
}
26 changes: 22 additions & 4 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ package utils
import (
"context"
"fmt"
"os"
"path"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)

const (
DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD"
DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME"
)

// ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query,
Expand All @@ -35,13 +41,13 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
for {
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
log.Errorf("Error in connection - %v", err)
fmt.Printf("Error in connection - %v\n", err)
continue
}

res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false)
if err != nil {
log.Errorf("Error in selecting - %v", err)
fmt.Printf("Error in selecting - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
Expand All @@ -56,10 +62,22 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
}
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
if err != nil {
log.Errorf("Error in cleanup deletion - %v", err)
fmt.Printf("Error in cleanup deletion - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
}
}
}

// WriteTestCommunicationFile writes the content to the file with the given name.
// We use these files to coordinate with the vttablets running in the debug mode.
func WriteTestCommunicationFile(t *testing.T, fileName string, content string) {
err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644)
require.NoError(t, err)
}

// DeleteFile deletes the file specified.
func DeleteFile(fileName string) {
_ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName))
}
8 changes: 8 additions & 0 deletions go/test/endtoend/transaction/twopc/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
"name": "xxhash"
}
]
},
"twopc_t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
}
}

0 comments on commit 040da43

Please sign in to comment.