Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
GuptaManan100 committed Dec 9, 2022
1 parent 0d9f531 commit 91add5a
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 3 deletions.
15 changes: 15 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ func (vttablet *VttabletProcess) GetStatus() string {
return ""
}

// GetGoroutineDemp returns /debug/pprof/profile endpoint result
func (vttablet *VttabletProcess) GetGoroutineDump() string {
URL := fmt.Sprintf("http://%s:%d/debug/pprof/goroutine?debug=2", vttablet.TabletHostname, vttablet.Port)
resp, err := http.Get(URL)
if err != nil {
return ""
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
respByte, _ := io.ReadAll(resp.Body)
return string(respByte)
}
return ""
}

// GetVars gets the debug vars as map
func (vttablet *VttabletProcess) GetVars() map[string]any {
resp, err := http.Get(vttablet.VerifyURL)
Expand Down
158 changes: 158 additions & 0 deletions go/test/endtoend/tabletserver/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import (
"context"
"flag"
"os"
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/grpctabletconn"
"vitess.io/vitess/go/vt/vttablet/queryservice"
)

var (
clusterInstance *cluster.LocalProcessCluster
queryService queryservice.QueryService
primaryTablet cluster.Vttablet
primaryTarget *querypb.Target
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
tabletHealthcheckRefreshInterval = 5 * time.Second
tabletUnhealthyThreshold = tabletHealthcheckRefreshInterval * 2
sqlSchema = `
create table t1(
id bigint,
value varchar(16),
primary key(id)
) Engine=InnoDB DEFAULT CHARSET=utf8;
CREATE VIEW v1 AS SELECT id, value FROM t1;
`

vSchema = `
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
}
}
}`
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
}

// Set extra tablet args for lock timeout
clusterInstance.VtTabletExtraArgs = []string{
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--heartbeat_enable",
"--health_check_interval", tabletHealthcheckRefreshInterval.String(),
"--unhealthy_threshold", tabletUnhealthyThreshold.String(),
"--shutdown_grace_period", "2",
}
// We do not need semiSync for this test case.
clusterInstance.EnableSemiSync = false

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}

if err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false); err != nil {
return 1
}

// Collect table paths and ports
primaryTablet = *clusterInstance.Keyspaces[0].Shards[0].Vttablets[0]

// create grpc client
queryService, err = grpctabletconn.DialTablet(getTablet(primaryTablet.GrpcPort), true)
if err != nil {
return 1
}

primaryTarget = &querypb.Target{
Keyspace: keyspaceName,
Shard: primaryTablet.VttabletProcess.Shard,
TabletType: topodatapb.TabletType_PRIMARY,
Cell: cell,
}

return m.Run()
}()
os.Exit(exitCode)
}

func getTablet(tabletGrpcPort int) *topodatapb.Tablet {
portMap := make(map[string]int32)
portMap["grpc"] = int32(tabletGrpcPort)
return &topodatapb.Tablet{Hostname: hostname, PortMap: portMap}
}

func waitForCheckMySQLRunning(t *testing.T, valWanted float64) {
t.Helper()
timeout := time.After(30 * time.Second)
for {
_, err := queryService.Execute(context.Background(), primaryTarget, "select id, value from t1 where id = 1", nil, 0, 0, &querypb.ExecuteOptions{})
status := primaryTablet.VttabletProcess.GetVars()
val, exists := status["CheckMySQLRunning"]
log.Errorf("%v, %v, %T, %v", exists, val, val, err)
if exists && val.(float64) == valWanted {
log.Errorf("returning from waitfor checkmysql")
return
}

select {
case <-timeout:
t.Fatalf("CheckMySQL didn't run in the time provided")
default:
time.Sleep(1 * time.Second)
}
}
}
75 changes: 75 additions & 0 deletions go/test/endtoend/tabletserver/misc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
)

// TabletReshuffle test if a vttablet can be pointed at an existing mysql
func TestTabletReshuffle(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()

var x = 0
for i := 0; i < 10000; i++ {
_, err := queryService.Execute(ctx, primaryTarget, fmt.Sprintf("insert into t1(id,value) values (%d, 'a'), (%d+1, 'a'), (%d+2, 'a'), (%d+3, 'a'), (%d+4, 'a'), (%d+5, 'a'), (%d+6, 'a'), (%d+7, 'a'), (%d+8, 'a'), (%d+9, 'a')", x, x, x, x, x, x, x, x, x, x), nil, 0, 0, &querypb.ExecuteOptions{})
x += 10
require.NoError(t, err)
}

log.Errorf("started stream execute")
finished := make(chan bool)
go func() {
callbackCount := 0
err := queryService.StreamExecute(ctx, primaryTarget, "select * from t1", nil, 0, 0, &querypb.ExecuteOptions{}, func(result *sqltypes.Result) error {
callbackCount++
time.Sleep(20 * time.Second)
return nil
})
log.Errorf("Error from streamExecute - %v", err)
assert.Equal(t, -1, callbackCount)
finished <- true
}()

err := primaryTablet.MysqlctlProcess.Stop()
require.NoError(t, err)
defer func() {
_ = primaryTablet.MysqlctlProcess.Start()
}()

waitForCheckMySQLRunning(t, 1)
select {
case <-finished:
log.Errorf("test finished")
return
case <-time.After(30 * time.Second):
time.Sleep(120 * time.Second)
t.Fatalf("StreamExecute didn't finish execution")
}
}
6 changes: 5 additions & 1 deletion go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package grpcqueryservice
import (
"context"

"github.com/openark/golib/log"
"google.golang.org/grpc"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -65,9 +66,12 @@ func (q *query) StreamExecute(request *querypb.StreamExecuteRequest, stream quer
request.ImmediateCallerId,
)
err = q.server.StreamExecute(ctx, request.Target, request.Query.Sql, request.Query.BindVariables, request.TransactionId, request.ReservedId, request.Options, func(reply *sqltypes.Result) error {
return stream.Send(&querypb.StreamExecuteResponse{
log.Errorf("Sending a stream now with %d rows", len(reply.Rows))
err2 := stream.Send(&querypb.StreamExecuteResponse{
Result: sqltypes.ResultToProto3(reply),
})
log.Errorf("Finished sending a stream now, error = %v", err)
return err2
})
return vterrors.ToGRPC(err)
}
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sync"

"github.com/openark/golib/log"
"github.com/spf13/pflag"
"google.golang.org/grpc"

Expand Down Expand Up @@ -181,18 +182,25 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
var fields []*querypb.Field
for {
ser, err := stream.Recv()
rowslen := 0
if ser != nil && ser.Result != nil {
rowslen = len(ser.Result.Rows)
}
log.Errorf("Received from stream - len = %v,err = %v", rowslen, err)
if err != nil {
return tabletconn.ErrorFromGRPC(err)
}
if fields == nil {
fields = ser.Result.Fields
}
log.Errorf("Started callback")
if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil {
if err == nil || err == io.EOF {
return nil
}
return err
}
log.Errorf("Finished callback")
}
}

Expand Down
8 changes: 7 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt

resultSent := false
for attempt := 1; attempt <= 2; attempt++ {
if query == "select * from t1" {
log.Errorf("Started stream once")
}
err := dbc.streamOnce(
ctx,
query,
Expand All @@ -226,6 +229,9 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt
alloc,
streamBufferSize,
)
if query == "select * from t1" {
log.Errorf("Error in stream once - %v", err)
}
switch {
case err == nil:
// Success.
Expand Down Expand Up @@ -267,7 +273,7 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*

done, wg := dbc.setDeadline(ctx)
err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)

log.Errorf("error from dbc.conn.ExecuteStreamFetch - %v", err)
if done != nil {
close(done)
wg.Wait()
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/query_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (

// QueryDetail is a simple wrapper for Query, Context and a killable conn.
type QueryDetail struct {
ctx context.Context
ctx context.Context
// cancel func for the context stored in the QueryDetail
// may be nil. nil check required before using
cancel context.CancelFunc
conn killable
connID int64
start time.Time
Expand Down

0 comments on commit 91add5a

Please sign in to comment.