From 91add5aed7bed18fdd08396cbe16f838ff00e21d Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 9 Dec 2022 10:21:12 +0530 Subject: [PATCH] wip --- go/test/endtoend/cluster/vttablet_process.go | 15 ++ go/test/endtoend/tabletserver/main_test.go | 158 ++++++++++++++++++ go/test/endtoend/tabletserver/misc_test.go | 75 +++++++++ go/vt/vttablet/grpcqueryservice/server.go | 6 +- go/vt/vttablet/grpctabletconn/conn.go | 8 + .../vttablet/tabletserver/connpool/dbconn.go | 8 +- go/vt/vttablet/tabletserver/query_list.go | 5 +- 7 files changed, 272 insertions(+), 3 deletions(-) create mode 100644 go/test/endtoend/tabletserver/main_test.go create mode 100644 go/test/endtoend/tabletserver/misc_test.go diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index a5eba51f22d..1231d2745b4 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -176,6 +176,21 @@ func (vttablet *VttabletProcess) GetStatus() string { return "" } +// GetGoroutineDemp returns /debug/pprof/profile endpoint result +func (vttablet *VttabletProcess) GetGoroutineDump() string { + URL := fmt.Sprintf("http://%s:%d/debug/pprof/goroutine?debug=2", vttablet.TabletHostname, vttablet.Port) + resp, err := http.Get(URL) + if err != nil { + return "" + } + defer resp.Body.Close() + if resp.StatusCode == 200 { + respByte, _ := io.ReadAll(resp.Body) + return string(respByte) + } + return "" +} + // GetVars gets the debug vars as map func (vttablet *VttabletProcess) GetVars() map[string]any { resp, err := http.Get(vttablet.VerifyURL) diff --git a/go/test/endtoend/tabletserver/main_test.go b/go/test/endtoend/tabletserver/main_test.go new file mode 100644 index 00000000000..3f81ae03a2e --- /dev/null +++ b/go/test/endtoend/tabletserver/main_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import ( + "context" + "flag" + "os" + "testing" + "time" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vttablet/grpctabletconn" + "vitess.io/vitess/go/vt/vttablet/queryservice" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + queryService queryservice.QueryService + primaryTablet cluster.Vttablet + primaryTarget *querypb.Target + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + tabletHealthcheckRefreshInterval = 5 * time.Second + tabletUnhealthyThreshold = tabletHealthcheckRefreshInterval * 2 + sqlSchema = ` + create table t1( + id bigint, + value varchar(16), + primary key(id) + ) Engine=InnoDB DEFAULT CHARSET=utf8; + CREATE VIEW v1 AS SELECT id, value FROM t1; +` + + vSchema = ` + { + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + } + }, + "tables": { + "t1": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + } + } + }` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Set extra tablet args for lock timeout + clusterInstance.VtTabletExtraArgs = []string{ + "--lock_tables_timeout", "5s", + "--watch_replication_stream", + "--heartbeat_enable", + "--health_check_interval", tabletHealthcheckRefreshInterval.String(), + "--unhealthy_threshold", tabletUnhealthyThreshold.String(), + "--shutdown_grace_period", "2", + } + // We do not need semiSync for this test case. + clusterInstance.EnableSemiSync = false + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + + if err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false); err != nil { + return 1 + } + + // Collect table paths and ports + primaryTablet = *clusterInstance.Keyspaces[0].Shards[0].Vttablets[0] + + // create grpc client + queryService, err = grpctabletconn.DialTablet(getTablet(primaryTablet.GrpcPort), true) + if err != nil { + return 1 + } + + primaryTarget = &querypb.Target{ + Keyspace: keyspaceName, + Shard: primaryTablet.VttabletProcess.Shard, + TabletType: topodatapb.TabletType_PRIMARY, + Cell: cell, + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func getTablet(tabletGrpcPort int) *topodatapb.Tablet { + portMap := make(map[string]int32) + portMap["grpc"] = int32(tabletGrpcPort) + return &topodatapb.Tablet{Hostname: hostname, PortMap: portMap} +} + +func waitForCheckMySQLRunning(t *testing.T, valWanted float64) { + t.Helper() + timeout := time.After(30 * time.Second) + for { + _, err := queryService.Execute(context.Background(), primaryTarget, "select id, value from t1 where id = 1", nil, 0, 0, &querypb.ExecuteOptions{}) + status := primaryTablet.VttabletProcess.GetVars() + val, exists := status["CheckMySQLRunning"] + log.Errorf("%v, %v, %T, %v", exists, val, val, err) + if exists && val.(float64) == valWanted { + log.Errorf("returning from waitfor checkmysql") + return + } + + select { + case <-timeout: + t.Fatalf("CheckMySQL didn't run in the time provided") + default: + time.Sleep(1 * time.Second) + } + } +} diff --git a/go/test/endtoend/tabletserver/misc_test.go b/go/test/endtoend/tabletserver/misc_test.go new file mode 100644 index 00000000000..b4eec9af9bb --- /dev/null +++ b/go/test/endtoend/tabletserver/misc_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +// TabletReshuffle test if a vttablet can be pointed at an existing mysql +func TestTabletReshuffle(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + + var x = 0 + for i := 0; i < 10000; i++ { + _, err := queryService.Execute(ctx, primaryTarget, fmt.Sprintf("insert into t1(id,value) values (%d, 'a'), (%d+1, 'a'), (%d+2, 'a'), (%d+3, 'a'), (%d+4, 'a'), (%d+5, 'a'), (%d+6, 'a'), (%d+7, 'a'), (%d+8, 'a'), (%d+9, 'a')", x, x, x, x, x, x, x, x, x, x), nil, 0, 0, &querypb.ExecuteOptions{}) + x += 10 + require.NoError(t, err) + } + + log.Errorf("started stream execute") + finished := make(chan bool) + go func() { + callbackCount := 0 + err := queryService.StreamExecute(ctx, primaryTarget, "select * from t1", nil, 0, 0, &querypb.ExecuteOptions{}, func(result *sqltypes.Result) error { + callbackCount++ + time.Sleep(20 * time.Second) + return nil + }) + log.Errorf("Error from streamExecute - %v", err) + assert.Equal(t, -1, callbackCount) + finished <- true + }() + + err := primaryTablet.MysqlctlProcess.Stop() + require.NoError(t, err) + defer func() { + _ = primaryTablet.MysqlctlProcess.Start() + }() + + waitForCheckMySQLRunning(t, 1) + select { + case <-finished: + log.Errorf("test finished") + return + case <-time.After(30 * time.Second): + time.Sleep(120 * time.Second) + t.Fatalf("StreamExecute didn't finish execution") + } +} diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go index bbd8d3a08c0..6c63884c10c 100644 --- a/go/vt/vttablet/grpcqueryservice/server.go +++ b/go/vt/vttablet/grpcqueryservice/server.go @@ -19,6 +19,7 @@ package grpcqueryservice import ( "context" + "github.com/openark/golib/log" "google.golang.org/grpc" "vitess.io/vitess/go/sqltypes" @@ -65,9 +66,12 @@ func (q *query) StreamExecute(request *querypb.StreamExecuteRequest, stream quer request.ImmediateCallerId, ) err = q.server.StreamExecute(ctx, request.Target, request.Query.Sql, request.Query.BindVariables, request.TransactionId, request.ReservedId, request.Options, func(reply *sqltypes.Result) error { - return stream.Send(&querypb.StreamExecuteResponse{ + log.Errorf("Sending a stream now with %d rows", len(reply.Rows)) + err2 := stream.Send(&querypb.StreamExecuteResponse{ Result: sqltypes.ResultToProto3(reply), }) + log.Errorf("Finished sending a stream now, error = %v", err) + return err2 }) return vterrors.ToGRPC(err) } diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index a10d7d2a584..d3f74095b5b 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -21,6 +21,7 @@ import ( "io" "sync" + "github.com/openark/golib/log" "github.com/spf13/pflag" "google.golang.org/grpc" @@ -181,18 +182,25 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb. var fields []*querypb.Field for { ser, err := stream.Recv() + rowslen := 0 + if ser != nil && ser.Result != nil { + rowslen = len(ser.Result.Rows) + } + log.Errorf("Received from stream - len = %v,err = %v", rowslen, err) if err != nil { return tabletconn.ErrorFromGRPC(err) } if fields == nil { fields = ser.Result.Fields } + log.Errorf("Started callback") if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil { if err == nil || err == io.EOF { return nil } return err } + log.Errorf("Finished callback") } } diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index c2fdf991056..59b58ca9f75 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -213,6 +213,9 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt resultSent := false for attempt := 1; attempt <= 2; attempt++ { + if query == "select * from t1" { + log.Errorf("Started stream once") + } err := dbc.streamOnce( ctx, query, @@ -226,6 +229,9 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt alloc, streamBufferSize, ) + if query == "select * from t1" { + log.Errorf("Error in stream once - %v", err) + } switch { case err == nil: // Success. @@ -267,7 +273,7 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(* done, wg := dbc.setDeadline(ctx) err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) - + log.Errorf("error from dbc.conn.ExecuteStreamFetch - %v", err) if done != nil { close(done) wg.Wait() diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index e78199c50ad..2d2971ac5f2 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -30,7 +30,10 @@ import ( // QueryDetail is a simple wrapper for Query, Context and a killable conn. type QueryDetail struct { - ctx context.Context + ctx context.Context + // cancel func for the context stored in the QueryDetail + // may be nil. nil check required before using + cancel context.CancelFunc conn killable connID int64 start time.Time