Skip to content

Commit

Permalink
Merge branch 'slack-19.0' into slack-19.0-vtgate-journal-events
Browse files Browse the repository at this point in the history
  • Loading branch information
makinje16 authored Feb 19, 2025
2 parents 498335a + 1c36ad8 commit 9576b5f
Show file tree
Hide file tree
Showing 31 changed files with 3,415 additions and 1,727 deletions.
5 changes: 5 additions & 0 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func tmcPrimaryPosition(ctx context.Context, tabletGrpcPort int) (string, error)
return tmClient.PrimaryPosition(ctx, vtablet)
}

func tmcGetGlobalStatusVars(ctx context.Context, tabletGrpcPort int, variables []string) (map[string]string, error) {
vtablet := getTablet(tabletGrpcPort)
return tmClient.GetGlobalStatusVars(ctx, vtablet, variables)
}

func tmcStartReplicationUntilAfter(ctx context.Context, tabletGrpcPort int, positon string, waittime time.Duration) error {
vtablet := getTablet(tabletGrpcPort)
return tmClient.StartReplicationUntilAfter(ctx, vtablet, positon, waittime)
Expand Down
29 changes: 29 additions & 0 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletmanager
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -100,3 +101,31 @@ func TestResetReplicationParameters(t *testing.T) {
require.NoError(t, err)
require.Len(t, res.Rows, 0)
}

// TestGetGlobalStatusVars tests the GetGlobalStatusVars RPC
func TestGetGlobalStatusVars(t *testing.T) {
ctx := context.Background()
statusValues, err := tmcGetGlobalStatusVars(ctx, replicaTablet.GrpcPort, []string{"Innodb_buffer_pool_pages_data", "unknown_value"})
require.NoError(t, err)
require.Len(t, statusValues, 1)
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_data")

statusValues, err = tmcGetGlobalStatusVars(ctx, replicaTablet.GrpcPort, []string{"Uptime", "Innodb_buffer_pool_pages_data"})
require.NoError(t, err)
require.Len(t, statusValues, 2)
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_data")
checkValueGreaterZero(t, statusValues, "Uptime")

statusValues, err = tmcGetGlobalStatusVars(ctx, replicaTablet.GrpcPort, nil)
require.NoError(t, err)
require.Greater(t, len(statusValues), 250)
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_data")
checkValueGreaterZero(t, statusValues, "Innodb_buffer_pool_pages_free")
checkValueGreaterZero(t, statusValues, "Uptime")
}

func checkValueGreaterZero(t *testing.T, statusValues map[string]string, val string) {
valInMap, err := strconv.Atoi(statusValues[val])
require.NoError(t, err)
require.Greater(t, valInMap, 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,16 @@ func TestGroupBy(t *testing.T) {
mcmp.Exec("insert into t3(id5, id6, id7) values(1,1,2), (2,2,4), (3,2,4), (4,1,2), (5,1,2), (6,3,6)")
// test ordering and group by int column
mcmp.AssertMatches("select id6, id7, count(*) k from t3 group by id6, id7 order by k", `[[INT64(3) INT64(6) INT64(1)] [INT64(2) INT64(4) INT64(2)] [INT64(1) INT64(2) INT64(3)]]`)
mcmp.AssertMatches("select id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`)
if utils.BinaryIsAtLeastAtVersion(18, "vtgate") {
mcmp.AssertMatches("select id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`)
}

// Test the same queries in streaming mode
utils.Exec(t, mcmp.VtConn, "set workload = olap")
mcmp.AssertMatches("select id6, id7, count(*) k from t3 group by id6, id7 order by k", `[[INT64(3) INT64(6) INT64(1)] [INT64(2) INT64(4) INT64(2)] [INT64(1) INT64(2) INT64(3)]]`)
mcmp.AssertMatches("select id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`)
if utils.BinaryIsAtLeastAtVersion(18, "vtgate") {
mcmp.AssertMatches("select id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`)
}
}

func TestEqualFilterOnScatter(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/queries/derived/derived_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestDerivedTableWithOrderByLimit(t *testing.T) {
}

func TestDerivedAggregationOnRHS(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 18, "vtgate")
mcmp, closer := start(t)
defer closer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

func TestFoundRows(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 18, "vtgate")
defer cluster.PanicHandler(t)
mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams)
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ func (fmd *FakeMysqlDaemon) SetSuperReadOnly(on bool) (ResetSuperReadOnlyFunc, e
return nil, nil
}

// GetGlobalStatusVars is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error) {
return make(map[string]string), fmd.ExecuteSuperQueryList(ctx, []string{
"FAKE " + getGlobalStatusQuery,
})
}

// StartReplication is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StartReplication(hookExtraEnv map[string]string) error {
if fmd.StartReplicationError != nil {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type MysqlDaemon interface {
// GetServerUUID returns the servers UUID
GetServerUUID(ctx context.Context) (string, error)

// GetGlobalStatusVars returns the server's global status variables asked for.
// An empty/nil variable name parameter slice means you want all of them.
GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error)

// replication related methods
StartReplication(hookExtraEnv map[string]string) error
RestartReplication(hookExtraEnv map[string]string) error
Expand Down
40 changes: 40 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,19 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/log"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

const (
// Queries used for RPCs
getGlobalStatusQuery = "SELECT variable_name, variable_value FROM performance_schema.global_status"
)

type ResetSuperReadOnlyFunc func() error

// WaitForReplicationStart waits until the deadline for replication to start.
Expand Down Expand Up @@ -216,6 +224,38 @@ func (mysqld *Mysqld) GetServerUUID(ctx context.Context) (string, error) {
return conn.Conn.GetServerUUID()
}

// GetGlobalStatusVars returns the server's global status variables asked for.
// An empty/nil variable name parameter slice means you want all of them.
func (mysqld *Mysqld) GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error) {
query := getGlobalStatusQuery
if len(variables) != 0 {
// The format specifier is for any optional predicates.
statusBv, err := sqltypes.BuildBindVariable(variables)
if err != nil {
return nil, err
}
query, err = sqlparser.ParseAndBind(getGlobalStatusQuery+" WHERE variable_name IN %a",
statusBv,
)
if err != nil {
return nil, err
}
}
qr, err := mysqld.FetchSuperQuery(ctx, query)
if err != nil {
return nil, err
}

finalRes := make(map[string]string, len(qr.Rows))
for _, row := range qr.Rows {
if len(row) != 2 {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "incorrect number of fields in the row")
}
finalRes[row[0].ToString()] = row[1].ToString()
}
return finalRes, nil
}

// IsReadOnly return true if the instance is read only
func (mysqld *Mysqld) IsReadOnly() (bool, error) {
qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW VARIABLES LIKE 'read_only'")
Expand Down
Loading

0 comments on commit 9576b5f

Please sign in to comment.