Skip to content

Commit

Permalink
wip - add multi shards test case
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed Feb 3, 2025
1 parent a41541a commit 2af7e91
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 9 deletions.
74 changes: 74 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package newfeaturetest
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
Expand Down Expand Up @@ -234,3 +237,74 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
// Wait for all the writes to have succeeded.
wg.Wait()
}

func TestBufferingErrorInTransaction(t *testing.T) {
clusterInstance := utils.SetupShardedReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)

// err := clusterInstance.StartVtgate()
// require.NoError(t, err)

// Start by reparenting all the shards to the first tablet.
keyspace := clusterInstance.Keyspaces[0]
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
// Confirm that the replication is setup correctly in the beginning.
// tablets[0] is the primary tablet in the beginning.
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2]})

// Start a transaction to the primary.
vtParams := clusterInstance.GetVTParams(keyspace.Name)
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
_, err = conn.ExecuteFetch("begin", 0, false)
require.NoError(t, err)
idx := 20
_, err = conn.ExecuteFetch(utils.GetInsertMultipleValuesQuery(idx, idx+1, idx+2, idx+3), 0, false)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
default:
idx += 5
_, err = conn.ExecuteFetch(utils.GetInsertMultipleValuesQuery(idx, idx+1, idx+2, idx+3), 0, false)
log.Errorf("Error received - %v", err)
if err != nil {
if strings.Contains(err.Error(), "transaction rolled back to reverse changes") {
log.Errorf("We have rolled back - issuing a new begin")
_, err = conn.ExecuteFetch("begin", 0, false)
require.NoError(t, err)
}
}
time.Sleep(100 * time.Millisecond)
}
}
}()

// Reparent to the other replica
utils.ShardName = "-80"
defer func() {
utils.ShardName = "0"
}()

output, err := utils.Prs(t, clusterInstance, tablets[1])
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)

time.Sleep(1 * time.Minute)

// We now restart the vttablet that became a replica.
utils.StopTablet(t, tablets[0], false)

time.Sleep(1 * time.Minute)

tablets[0].VttabletProcess.ServingStatus = "SERVING"
err = tablets[0].VttabletProcess.Setup()
require.NoError(t, err)

time.Sleep(1 * time.Minute)
cancel()
}
24 changes: 15 additions & 9 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ import (
)

var (
KeyspaceName = "ks"
dbName = "vt_" + KeyspaceName
username = "vt_dba"
Hostname = "localhost"
insertVal = 1
insertSQL = "insert into vt_insert_test(id, msg) values (%d, 'test %d')"
sqlSchema = `
KeyspaceName = "ks"
dbName = "vt_" + KeyspaceName
username = "vt_dba"
Hostname = "localhost"
insertVal = 1
insertSQL = "insert into vt_insert_test(id, msg) values (%d, 'test %d')"
insertSQLMultipleValues = "insert into vt_insert_test(id, msg) values (%d, 'test %d'), (%d, 'test %d'), (%d, 'test %d'), (%d, 'test %d')"
sqlSchema = `
create table vt_insert_test (
id bigint,
msg varchar(64),
Expand Down Expand Up @@ -100,10 +101,10 @@ func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.Local
keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: sqlSchema,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "reverse_bits"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
DurabilityPolicy: durability,
}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 2, false)
require.NoError(t, err)

// Start Vtgate
Expand All @@ -117,6 +118,11 @@ func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetInsertMultipleValuesQuery returns a built insert query to insert multiple rows at once.
func GetInsertMultipleValuesQuery(idx1, idx2, idx3, idx4 int) string {
return fmt.Sprintf(insertSQLMultipleValues, idx1, idx1, idx2, idx2, idx3, idx3, idx4, idx4)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
Expand Down

0 comments on commit 2af7e91

Please sign in to comment.