From 3cc3d88d9edae13af2fe8eb3e7dbab51b4aa6b73 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Fri, 28 Jun 2024 15:00:16 -0600 Subject: [PATCH 01/10] Add mysql connection drain Signed-off-by: Florent Poinsard --- go/flags/endtoend/vtgate.txt | 1 + go/test/endtoend/cluster/vtgate_process.go | 13 ++ go/vt/servenv/run.go | 2 +- .../endtoend/connectiondrain/main_test.go | 153 ++++++++++++++++++ .../endtoend/connectiondrain/schema.sql | 5 + go/vt/vtgate/plugin_mysql_server.go | 40 ++++- 6 files changed, 205 insertions(+), 9 deletions(-) create mode 100644 go/vt/vtgate/endtoend/connectiondrain/main_test.go create mode 100644 go/vt/vtgate/endtoend/connectiondrain/schema.sql diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 16f261194ca..68899cecd56 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -141,6 +141,7 @@ Flags: --mysql_ldap_auth_config_string string JSON representation of LDAP server config. --mysql_ldap_auth_method string client-side authentication method to use. Supported values: mysql_clear_password, dialog. (default "mysql_clear_password") --mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance. + --mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain --mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms) --mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1) --mysql_server_query_timeout duration mysql query timeout diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index d1877fb89bb..298d4b111df 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -237,6 +237,19 @@ func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPoints return fmt.Errorf("wait for %s failed", name) } +// IsShutdown checks if the vtgate process is shutdown +func (vtgate *VtgateProcess) IsShutdown() bool { + return vtgate.proc.ProcessState.Exited() +} + +// Terminate sends a SIGTERM to vtgate +func (vtgate *VtgateProcess) Terminate() error { + if vtgate.proc == nil { + return nil + } + return vtgate.proc.Process.Signal(syscall.SIGTERM) +} + // TearDown shuts down the running vtgate service func (vtgate *VtgateProcess) TearDown() error { if vtgate.proc == nil || vtgate.exit == nil { diff --git a/go/vt/servenv/run.go b/go/vt/servenv/run.go index 29b15a40008..cef81e87a99 100644 --- a/go/vt/servenv/run.go +++ b/go/vt/servenv/run.go @@ -59,7 +59,6 @@ func Run(bindAddress string, port int) { signal.Notify(ExitChan, syscall.SIGTERM, syscall.SIGINT) // Wait for signal <-ExitChan - l.Close() startTime := time.Now() log.Infof("Entering lameduck mode for at least %v", timeouts.LameduckPeriod) @@ -71,6 +70,7 @@ func Run(bindAddress string, port int) { log.Infof("Sleeping an extra %v after OnTermSync to finish lameduck period", remain) time.Sleep(remain) } + l.Close() log.Info("Shutting down gracefully") fireOnCloseHooks(timeouts.OnCloseTimeout) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go new file mode 100644 index 00000000000..56276be417e --- /dev/null +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -0,0 +1,153 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectiondrain + +import ( + "context" + _ "embed" + "flag" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "zone-1" + + //go:embed schema.sql + schemaSQL string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false) + if err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--mysql_server_drain_onterm", "--onterm_timeout", "30s") + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = clusterInstance.GetVTParams(keyspaceName) + return m.Run() + }() + os.Exit(exitCode) +} + +func start(t *testing.T) (*mysql.Conn, func()) { + vtConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + deleteAll := func() { + _, _ = utils.ExecAllowError(t, vtConn, "set workload = oltp") + + tables := []string{"t1"} + for _, table := range tables { + _, _ = utils.ExecAllowError(t, vtConn, "delete from "+table) + } + } + + deleteAll() + + return vtConn, func() { + deleteAll() + vtConn.Close() + cluster.PanicHandler(t) + } +} + +func TestConnectionDrain(t *testing.T) { + vtConn, closer := start(t) + defer closer() + + // Create a second connection, this connection will be used to create a transaction. + vtConn2, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + // Start the transaction with the second connection + _, err = vtConn2.ExecuteFetch("BEGIN", 1, false) + require.NoError(t, err) + _, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + + _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + + // Tearing down vtgate here, from there on vtConn should still be able to conclude in-flight transaction and + // execute queries with idle connections. However, no new connections are allowed. + err = clusterInstance.VtgateProcess.Terminate() + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + // Create a third connection, this connection should not be allowed. + // Set a connection timeout to 1s otherwise the connection will take a while. + vtParams.ConnectTimeoutMs = 1000 + _, err = mysql.Connect(context.Background(), &vtParams) + require.Error(t, err) + + // Idle connections should be allowed to execute queries until they are drained + _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + + // Finish the transaction + _, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + _, err = vtConn2.ExecuteFetch("COMMIT", 1, false) + require.NoError(t, err) + vtConn2.Close() + + // This connection should still be allowed + _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) + require.NoError(t, err) + vtConn.Close() + + time.Sleep(10 * time.Second) + + // By now the vtgate should have shutdown + require.True(t, clusterInstance.VtgateProcess.IsShutdown()) +} diff --git a/go/vt/vtgate/endtoend/connectiondrain/schema.sql b/go/vt/vtgate/endtoend/connectiondrain/schema.sql new file mode 100644 index 00000000000..45db74d062f --- /dev/null +++ b/go/vt/vtgate/endtoend/connectiondrain/schema.sql @@ -0,0 +1,5 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 175f4b2cc8f..00301724f31 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -75,6 +75,7 @@ var ( mysqlDefaultWorkloadName = "OLTP" mysqlDefaultWorkload int32 + mysqlDrainOnTerm bool mysqlServerFlushDelay = 100 * time.Millisecond ) @@ -102,6 +103,7 @@ func registerPluginFlags(fs *pflag.FlagSet) { fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives") fs.DurationVar(&mysqlServerFlushDelay, "mysql_server_flush_delay", mysqlServerFlushDelay, "Delay after which buffered response will be flushed to the client.") fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)") + fs.BoolVar(&mysqlDrainOnTerm, "mysql_server_drain_onterm", mysqlDrainOnTerm, "If set, the server waits for --onterm_timeout for connected clients to drain") } // vtgateHandler implements the Listener interface. @@ -621,18 +623,28 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys } func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { - if srv.tcpListener != nil { - srv.tcpListener.Shutdown() - srv.tcpListener = nil - } - if srv.unixListener != nil { - srv.unixListener.Shutdown() - srv.unixListener = nil - } if srv.sigChan != nil { signal.Stop(srv.sigChan) } + if mysqlDrainOnTerm { + stopListener(srv.unixListener, false) + stopListener(srv.tcpListener, false) + // We wait for connected clients to drain by themselves or to run into the onterm timeout + log.Infof("Starting drain loop, waiting for all clients to disconnect") + reported := time.Now() + for srv.vtgateHandle.numConnections() > 0 { + if time.Since(reported) > 2*time.Second { + log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections()) + reported = time.Now() + } + time.Sleep(1000 * time.Millisecond) + } + return + } + + stopListener(srv.unixListener, true) + stopListener(srv.tcpListener, true) if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 { log.Infof("Waiting for all client connections to be idle (%d active)...", busy) start := time.Now() @@ -649,6 +661,18 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { } } +// stopListener Close or Shutdown a mysql listener depending on the shutdown argument. +func stopListener(listener *mysql.Listener, shutdown bool) { + if listener == nil { + return + } + if shutdown { + listener.Shutdown() + } else { + listener.Close() + } +} + func (srv *mysqlServer) rollbackAtShutdown() { defer log.Flush() if srv.vtgateHandle == nil { From d9982be38ba7003f792abc7a0cab0acfff20e509 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Fri, 28 Jun 2024 15:14:42 -0600 Subject: [PATCH 02/10] Fix tests Signed-off-by: Florent Poinsard --- go/flags/endtoend/vtcombo.txt | 1 + go/test/endtoend/cluster/vtgate_process.go | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index fa9b3d6907e..b404d28f875 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -231,6 +231,7 @@ Flags: --mysql_default_workload string Default session workload (OLTP, OLAP, DBA) (default "OLTP") --mysql_port int mysql port (default 3306) --mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance. + --mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain --mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms) --mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1) --mysql_server_query_timeout duration mysql query timeout diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 298d4b111df..72e1e118402 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -62,8 +62,9 @@ type VtgateProcess struct { // Extra Args to be set before starting the vtgate process ExtraArgs []string - proc *exec.Cmd - exit chan error + isShutdown bool + proc *exec.Cmd + exit chan error } const defaultVtGatePlannerVersion = planbuilder.Gen4 @@ -150,6 +151,7 @@ func (vtgate *VtgateProcess) Setup() (err error) { go func() { if vtgate.proc != nil { vtgate.exit <- vtgate.proc.Wait() + vtgate.isShutdown = true close(vtgate.exit) } }() @@ -239,7 +241,7 @@ func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPoints // IsShutdown checks if the vtgate process is shutdown func (vtgate *VtgateProcess) IsShutdown() bool { - return vtgate.proc.ProcessState.Exited() + return vtgate.isShutdown } // Terminate sends a SIGTERM to vtgate From c3a3b2f89d64beb4ecf7fec7a29ac0faeade7402 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Fri, 28 Jun 2024 15:33:17 -0600 Subject: [PATCH 03/10] Change the way we check if vtgate is shutdown or not + add comments Signed-off-by: Florent Poinsard --- go/test/endtoend/cluster/vtgate_process.go | 2 +- go/vt/vtgate/endtoend/connectiondrain/main_test.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 72e1e118402..5408b1282bb 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -241,7 +241,7 @@ func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPoints // IsShutdown checks if the vtgate process is shutdown func (vtgate *VtgateProcess) IsShutdown() bool { - return vtgate.isShutdown + return !vtgate.WaitForStatus() } // Terminate sends a SIGTERM to vtgate diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go index 56276be417e..573ea03fb51 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -122,6 +122,7 @@ func TestConnectionDrain(t *testing.T) { err = clusterInstance.VtgateProcess.Terminate() require.NoError(t, err) + // Give enough time to vtgate to receive and start processing the SIGTERM signal time.Sleep(2 * time.Second) // Create a third connection, this connection should not be allowed. @@ -141,13 +142,17 @@ func TestConnectionDrain(t *testing.T) { require.NoError(t, err) vtConn2.Close() + // vtgate should still be running + require.False(t, clusterInstance.VtgateProcess.IsShutdown()) + // This connection should still be allowed _, err = vtConn.ExecuteFetch("select id1 from t1", 1, false) require.NoError(t, err) vtConn.Close() + // Give enough time for vtgate to finish all the onterm hooks without reaching the 30s of --onterm_timeout time.Sleep(10 * time.Second) - // By now the vtgate should have shutdown + // By now the vtgate should have shutdown on its own and without reaching --onterm_timeout require.True(t, clusterInstance.VtgateProcess.IsShutdown()) } From 1a81211b8c44c04d53e99dffcb6ba7cf25ffafd4 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Fri, 28 Jun 2024 15:36:32 -0600 Subject: [PATCH 04/10] Revert ConnectTimeoutMs when done Signed-off-by: Florent Poinsard --- go/vt/vtgate/endtoend/connectiondrain/main_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go index 573ea03fb51..47bc28ab8a1 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -126,8 +126,12 @@ func TestConnectionDrain(t *testing.T) { time.Sleep(2 * time.Second) // Create a third connection, this connection should not be allowed. - // Set a connection timeout to 1s otherwise the connection will take a while. + // Set a connection timeout to 1s otherwise the connection will take forever + // and eventually vtgate will reach the --onterm_timeout. vtParams.ConnectTimeoutMs = 1000 + defer func() { + vtParams.ConnectTimeoutMs = 0 + }() _, err = mysql.Connect(context.Background(), &vtParams) require.Error(t, err) From c630c19c9867d0f9b6ed96c57921af88214e1299 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Mon, 1 Jul 2024 14:27:10 -0400 Subject: [PATCH 05/10] Apply review suggestions Signed-off-by: Florent Poinsard --- go/test/endtoend/cluster/vtgate_process.go | 6 ++---- go/vt/vtgate/plugin_mysql_server.go | 1 + 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 5408b1282bb..5143e9ad8a4 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -62,9 +62,8 @@ type VtgateProcess struct { // Extra Args to be set before starting the vtgate process ExtraArgs []string - isShutdown bool - proc *exec.Cmd - exit chan error + proc *exec.Cmd + exit chan error } const defaultVtGatePlannerVersion = planbuilder.Gen4 @@ -151,7 +150,6 @@ func (vtgate *VtgateProcess) Setup() (err error) { go func() { if vtgate.proc != nil { vtgate.exit <- vtgate.proc.Wait() - vtgate.isShutdown = true close(vtgate.exit) } }() diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 00301724f31..5b460ff3388 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -671,6 +671,7 @@ func stopListener(listener *mysql.Listener, shutdown bool) { } else { listener.Close() } + listener = nil } func (srv *mysqlServer) rollbackAtShutdown() { From 1f7dfd3d2da71053103e39b3fde8f043928786e1 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Mon, 1 Jul 2024 15:26:40 -0400 Subject: [PATCH 06/10] Add more tests Signed-off-by: Florent Poinsard --- .../endtoend/connectiondrain/main_test.go | 94 +++++++++++++------ 1 file changed, 66 insertions(+), 28 deletions(-) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go index 47bc28ab8a1..dce8e5242b4 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -21,6 +21,7 @@ import ( _ "embed" "flag" "os" + "sync" "testing" "time" @@ -44,38 +45,35 @@ var ( func TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() + os.Exit(m.Run()) +} - exitCode := func() int { - clusterInstance = cluster.NewCluster(cell, "localhost") - defer clusterInstance.Teardown() +func setupCluster(t *testing.T) { + clusterInstance = cluster.NewCluster(cell, "localhost") - // Start topo server - err := clusterInstance.StartTopo() - if err != nil { - return 1 - } + // Start topo server + err := clusterInstance.StartTopo() + require.NoError(t, err) - // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: schemaSQL, - } - err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false) - if err != nil { - return 1 - } + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false) + require.NoError(t, err) - // Start vtgate - clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--mysql_server_drain_onterm", "--onterm_timeout", "30s") - err = clusterInstance.StartVtgate() - if err != nil { - return 1 - } + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--mysql_server_drain_onterm", "--onterm_timeout", "30s") + err = clusterInstance.StartVtgate() + require.NoError(t, err) - vtParams = clusterInstance.GetVTParams(keyspaceName) - return m.Run() - }() - os.Exit(exitCode) + vtParams = clusterInstance.GetVTParams(keyspaceName) +} + +func cleanupCluster() { + clusterInstance.Teardown() + clusterInstance = nil } func start(t *testing.T) (*mysql.Conn, func()) { @@ -100,7 +98,10 @@ func start(t *testing.T) (*mysql.Conn, func()) { } } -func TestConnectionDrain(t *testing.T) { +func TestConnectionDrainCloseConnections(t *testing.T) { + setupCluster(t) + defer cleanupCluster() + vtConn, closer := start(t) defer closer() @@ -160,3 +161,40 @@ func TestConnectionDrain(t *testing.T) { // By now the vtgate should have shutdown on its own and without reaching --onterm_timeout require.True(t, clusterInstance.VtgateProcess.IsShutdown()) } + +func TestConnectionDrainOnTermTimeout(t *testing.T) { + setupCluster(t) + defer cleanupCluster() + + // Connect to vtgate again, this should work + vtConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + vtConn2, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + // Tearing down vtgate here, we want to reach the onterm_timeout of 30s + err = clusterInstance.VtgateProcess.Terminate() + require.NoError(t, err) + + // Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) + require.Error(t, err) + vtConn.Close() + }() + + // Sleeping 40 seconds here is already plenty of time, and we will for sure reach the onterm_timeout of 30s + time.Sleep(40 * time.Second) + + // Running a query after we have reached the onterm_timeout should fail + _, err = vtConn2.ExecuteFetch("select id from t1", 1, false) + require.Error(t, err) + + wg.Wait() + + // By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened + require.True(t, clusterInstance.VtgateProcess.IsShutdown()) +} From 232cbcb1e0a0b6898056401cf8225ad1f2e885de Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Mon, 1 Jul 2024 16:57:58 -0400 Subject: [PATCH 07/10] Fix closing of connections Signed-off-by: Florent Poinsard --- go/vt/vtgate/endtoend/connectiondrain/main_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go index dce8e5242b4..5f358fc17f1 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -172,6 +172,11 @@ func TestConnectionDrainOnTermTimeout(t *testing.T) { vtConn2, err := mysql.Connect(context.Background(), &vtParams) require.NoError(t, err) + defer func() { + vtConn.Close() + vtConn2.Close() + }() + // Tearing down vtgate here, we want to reach the onterm_timeout of 30s err = clusterInstance.VtgateProcess.Terminate() require.NoError(t, err) @@ -183,7 +188,6 @@ func TestConnectionDrainOnTermTimeout(t *testing.T) { defer wg.Done() _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) require.Error(t, err) - vtConn.Close() }() // Sleeping 40 seconds here is already plenty of time, and we will for sure reach the onterm_timeout of 30s From 8edd8bdc973ca31a887c0d27956b9e5a47fe865b Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Mon, 1 Jul 2024 17:20:48 -0400 Subject: [PATCH 08/10] Fix concurrency issues Signed-off-by: Florent Poinsard --- .../endtoend/connectiondrain/main_test.go | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go index 5f358fc17f1..716cc9a8a37 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -33,10 +33,8 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - keyspaceName = "ks" - cell = "zone-1" + keyspaceName = "ks" + cell = "zone-1" //go:embed schema.sql schemaSQL string @@ -48,8 +46,8 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func setupCluster(t *testing.T) { - clusterInstance = cluster.NewCluster(cell, "localhost") +func setupCluster(t *testing.T) (*cluster.LocalProcessCluster, mysql.ConnParams) { + clusterInstance := cluster.NewCluster(cell, "localhost") // Start topo server err := clusterInstance.StartTopo() @@ -68,15 +66,11 @@ func setupCluster(t *testing.T) { err = clusterInstance.StartVtgate() require.NoError(t, err) - vtParams = clusterInstance.GetVTParams(keyspaceName) + vtParams := clusterInstance.GetVTParams(keyspaceName) + return clusterInstance, vtParams } -func cleanupCluster() { - clusterInstance.Teardown() - clusterInstance = nil -} - -func start(t *testing.T) (*mysql.Conn, func()) { +func start(t *testing.T, vtParams mysql.ConnParams) (*mysql.Conn, func()) { vtConn, err := mysql.Connect(context.Background(), &vtParams) require.NoError(t, err) @@ -99,10 +93,10 @@ func start(t *testing.T) (*mysql.Conn, func()) { } func TestConnectionDrainCloseConnections(t *testing.T) { - setupCluster(t) - defer cleanupCluster() + clusterInstance, vtParams := setupCluster(t) + defer clusterInstance.Teardown() - vtConn, closer := start(t) + vtConn, closer := start(t, vtParams) defer closer() // Create a second connection, this connection will be used to create a transaction. @@ -163,8 +157,8 @@ func TestConnectionDrainCloseConnections(t *testing.T) { } func TestConnectionDrainOnTermTimeout(t *testing.T) { - setupCluster(t) - defer cleanupCluster() + clusterInstance, vtParams := setupCluster(t) + defer clusterInstance.Teardown() // Connect to vtgate again, this should work vtConn, err := mysql.Connect(context.Background(), &vtParams) From a57a57cf6287233ebb8efc65c32869c131107a5b Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Mon, 1 Jul 2024 17:31:49 -0400 Subject: [PATCH 09/10] Fix race issue with errors Signed-off-by: Florent Poinsard --- go/vt/vtgate/endtoend/connectiondrain/main_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/vt/vtgate/endtoend/connectiondrain/main_test.go index 716cc9a8a37..33a68f08645 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/vt/vtgate/endtoend/connectiondrain/main_test.go @@ -180,6 +180,7 @@ func TestConnectionDrainOnTermTimeout(t *testing.T) { wg.Add(1) go func() { defer wg.Done() + var err error _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) require.Error(t, err) }() From f1b0d532813bfa8bda4eb8c2c11b9ff7c62c2912 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 2 Jul 2024 15:56:43 -0400 Subject: [PATCH 10/10] Apply review suggestions Signed-off-by: Florent Poinsard --- .../vtgate}/connectiondrain/main_test.go | 16 ++-------------- .../endtoend/vtgate}/connectiondrain/schema.sql | 0 go/vt/vtgate/plugin_mysql_server.go | 7 ++++++- test/config.json | 9 +++++++++ 4 files changed, 17 insertions(+), 15 deletions(-) rename go/{vt/vtgate/endtoend => test/endtoend/vtgate}/connectiondrain/main_test.go (94%) rename go/{vt/vtgate/endtoend => test/endtoend/vtgate}/connectiondrain/schema.sql (100%) diff --git a/go/vt/vtgate/endtoend/connectiondrain/main_test.go b/go/test/endtoend/vtgate/connectiondrain/main_test.go similarity index 94% rename from go/vt/vtgate/endtoend/connectiondrain/main_test.go rename to go/test/endtoend/vtgate/connectiondrain/main_test.go index 33a68f08645..32807b22bde 100644 --- a/go/vt/vtgate/endtoend/connectiondrain/main_test.go +++ b/go/test/endtoend/vtgate/connectiondrain/main_test.go @@ -21,7 +21,6 @@ import ( _ "embed" "flag" "os" - "sync" "testing" "time" @@ -176,24 +175,13 @@ func TestConnectionDrainOnTermTimeout(t *testing.T) { require.NoError(t, err) // Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - var err error - _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) - require.Error(t, err) - }() - - // Sleeping 40 seconds here is already plenty of time, and we will for sure reach the onterm_timeout of 30s - time.Sleep(40 * time.Second) + _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) + require.Error(t, err) // Running a query after we have reached the onterm_timeout should fail _, err = vtConn2.ExecuteFetch("select id from t1", 1, false) require.Error(t, err) - wg.Wait() - // By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened require.True(t, clusterInstance.VtgateProcess.IsShutdown()) } diff --git a/go/vt/vtgate/endtoend/connectiondrain/schema.sql b/go/test/endtoend/vtgate/connectiondrain/schema.sql similarity index 100% rename from go/vt/vtgate/endtoend/connectiondrain/schema.sql rename to go/test/endtoend/vtgate/connectiondrain/schema.sql diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 5b460ff3388..55954013862 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -626,10 +626,15 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { if srv.sigChan != nil { signal.Stop(srv.sigChan) } + setListenerToNil := func() { + srv.tcpListener = nil + srv.unixListener = nil + } if mysqlDrainOnTerm { stopListener(srv.unixListener, false) stopListener(srv.tcpListener, false) + setListenerToNil() // We wait for connected clients to drain by themselves or to run into the onterm timeout log.Infof("Starting drain loop, waiting for all clients to disconnect") reported := time.Now() @@ -645,6 +650,7 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() { stopListener(srv.unixListener, true) stopListener(srv.tcpListener, true) + setListenerToNil() if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 { log.Infof("Waiting for all client connections to be idle (%d active)...", busy) start := time.Now() @@ -671,7 +677,6 @@ func stopListener(listener *mysql.Listener, shutdown bool) { } else { listener.Close() } - listener = nil } func (srv *mysqlServer) rollbackAtShutdown() { diff --git a/test/config.json b/test/config.json index 713faf97024..8b48ccc3ec0 100644 --- a/test/config.json +++ b/test/config.json @@ -500,6 +500,15 @@ "RetryMax": 2, "Tags": [] }, + "vtgate_connectiondrain": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/connectiondrain"], + "Command": [], + "Manual": false, + "Shard": "vtgate_general_heavy", + "RetryMax": 2, + "Tags": [] + }, "vtgate_queries_derived": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/queries/derived"],