From 06e5ae483cec19dad0d5dd4800573f6885347f0c Mon Sep 17 00:00:00 2001 From: Tanjin Xu <109303790+tanjinx@users.noreply.github.com> Date: Wed, 19 Feb 2025 15:42:23 -0800 Subject: [PATCH] pool: reopen connection closed by idle timeout (#17818) (#609) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Harshit Gangal Signed-off-by: Vicent Martí <42793+vmg@users.noreply.github.com> Co-authored-by: Harshit Gangal Co-authored-by: Vicent Martí <42793+vmg@users.noreply.github.com> --- go/pools/smartconnpool/pool.go | 24 ++++++++-- go/pools/smartconnpool/pool_test.go | 73 +++++++++++++++++++++++------ 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index ddc2ce0a5ca..ef69e9cafda 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -133,7 +133,6 @@ type ConnPool[C Connection] struct { connect Connector[C] // refresh is the callback to check whether the pool needs to be refreshed refresh RefreshCheck - // maxCapacity is the maximum value to which capacity can be set; when the pool // is re-opened, it defaults to this capacity maxCapacity int64 @@ -380,6 +379,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { if conn == nil { var err error + // Using context.Background() is fine since MySQL connection already enforces + // a connect timeout via the `db_connect_timeout_ms` config param. conn, err = pool.connNew(context.Background()) if err != nil { pool.closedConn() @@ -392,6 +393,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { if lifetime > 0 && conn.timeCreated.elapsed() > lifetime { pool.Metrics.maxLifetimeClosed.Add(1) conn.Close() + // Using context.Background() is fine since MySQL connection already enforces + // a connect timeout via the `db_connect_timeout_ms` config param. if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil { pool.closedConn() return @@ -456,15 +459,22 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration { return time.Duration(maxLifetime) + time.Duration(extended) } -func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error { - var err error +func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) (err error) { dbconn.Conn, err = pool.config.connect(ctx) if err != nil { return err } - dbconn.timeUsed.set(now) + if setting := dbconn.Conn.Setting(); setting != nil { + err = dbconn.Conn.ApplySetting(ctx, setting) + if err != nil { + dbconn.Close() + return err + } + } + dbconn.timeCreated.set(now) + dbconn.timeUsed.set(now) return nil } @@ -721,7 +731,11 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { if conn.timeUsed.expired(mono, timeout) { pool.Metrics.idleClosed.Add(1) conn.Close() - pool.closedConn() + // Using context.Background() is fine since MySQL connection already enforces + // a connect timeout via the `db_connect_timeout_ms` config param. + if err := pool.connReopen(context.Background(), conn, mono); err != nil { + pool.closedConn() + } } } } diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 6486844fb6c..cf0b18de252 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -36,12 +36,11 @@ var ( type TestState struct { lastID, open, close, reset atomic.Int64 - waits []time.Time mu sync.Mutex - - chaos struct { + waits []time.Time + chaos struct { delayConnect time.Duration - failConnect bool + failConnect atomic.Bool failApply bool } } @@ -109,7 +108,7 @@ func newConnector(state *TestState) Connector[*TestConn] { if state.chaos.delayConnect != 0 { time.Sleep(state.chaos.delayConnect) } - if state.chaos.failConnect { + if state.chaos.failConnect.Load() { return nil, fmt.Errorf("failed to connect: forced failure") } return &TestConn{ @@ -586,6 +585,45 @@ func TestUserClosing(t *testing.T) { } } +func TestConnReopen(t *testing.T) { + var state TestState + + p := NewPool(&Config[*TestConn]{ + Capacity: 1, + IdleTimeout: 200 * time.Millisecond, + MaxLifetime: 10 * time.Millisecond, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + defer p.Close() + + conn, err := p.Get(context.Background(), nil) + require.NoError(t, err) + assert.EqualValues(t, 1, state.lastID.Load()) + assert.EqualValues(t, 1, p.Active()) + + // wait enough to reach maxlifetime. + time.Sleep(50 * time.Millisecond) + + p.put(conn) + assert.EqualValues(t, 2, state.lastID.Load()) + assert.EqualValues(t, 1, p.Active()) + + // wait enough to reach idle timeout. + time.Sleep(300 * time.Millisecond) + assert.GreaterOrEqual(t, state.lastID.Load(), int64(3)) + assert.EqualValues(t, 1, p.Active()) + assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(1)) + + // mark connect to fail + state.chaos.failConnect.Store(true) + // wait enough to reach idle timeout and connect to fail. + time.Sleep(300 * time.Millisecond) + // no active connection should be left. + assert.Zero(t, p.Active()) + +} + func TestIdleTimeout(t *testing.T) { testTimeout := func(t *testing.T, setting *Setting) { var state TestState @@ -608,6 +646,7 @@ func TestIdleTimeout(t *testing.T) { conns = append(conns, r) } + assert.GreaterOrEqual(t, state.open.Load(), int64(5)) // wait a long while; ensure that none of the conns have been closed time.Sleep(1 * time.Second) @@ -628,9 +667,15 @@ func TestIdleTimeout(t *testing.T) { t.Fatalf("Connections remain open after 1 second") } } + // At least 5 connections should have been closed by now. + assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(5), "At least 5 connections should have been closed by now.") + + // At any point, at least 4 connections should be open, with 1 either in the process of opening or already opened. + // The idle connection closer shuts down one connection at a time. + assert.GreaterOrEqual(t, state.open.Load(), int64(4)) - // no need to assert anything: all the connections in the pool should are idle-closed - // now and if they're not the test will timeout and fail + // The number of available connections in the pool should remain at 5. + assert.EqualValues(t, 5, p.Available()) } t.Run("WithoutSettings", func(t *testing.T) { testTimeout(t, nil) }) @@ -656,7 +701,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) { // Change the factory before putting back // to prevent race with the idle closer, who will // try to use it. - state.chaos.failConnect = true + state.chaos.failConnect.Store(true) p.put(r) timeout := time.After(1 * time.Second) for p.Active() != 0 { @@ -667,7 +712,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) { } } // reset factory for next run. - state.chaos.failConnect = false + state.chaos.failConnect.Store(false) } } @@ -758,7 +803,7 @@ func TestExtendedLifetimeTimeout(t *testing.T) { func TestCreateFail(t *testing.T) { var state TestState - state.chaos.failConnect = true + state.chaos.failConnect.Store(true) ctx := context.Background() p := NewPool(&Config[*TestConn]{ @@ -805,12 +850,12 @@ func TestCreateFailOnPut(t *testing.T) { require.NoError(t, err) // change factory to fail the put. - state.chaos.failConnect = true + state.chaos.failConnect.Store(true) p.put(nil) assert.Zero(t, p.Active()) // change back for next iteration. - state.chaos.failConnect = false + state.chaos.failConnect.Store(false) } } @@ -828,7 +873,7 @@ func TestSlowCreateFail(t *testing.T) { LogWait: state.LogWait, }).Open(newConnector(&state), nil) - state.chaos.failConnect = true + state.chaos.failConnect.Store(true) for i := 0; i < 3; i++ { go func() { @@ -847,7 +892,7 @@ func TestSlowCreateFail(t *testing.T) { default: } - state.chaos.failConnect = false + state.chaos.failConnect.Store(false) conn, err := p.Get(ctx, setting) require.NoError(t, err)