diff --git a/go/pools/smartconnpool/connection.go b/go/pools/smartconnpool/connection.go index cdb5720596e..dbc235a8218 100644 --- a/go/pools/smartconnpool/connection.go +++ b/go/pools/smartconnpool/connection.go @@ -19,7 +19,6 @@ package smartconnpool import ( "context" "sync/atomic" - "time" ) type Connection interface { @@ -33,8 +32,8 @@ type Connection interface { type Pooled[C Connection] struct { next atomic.Pointer[Pooled[C]] - timeCreated time.Time - timeUsed time.Time + timeCreated timestamp + timeUsed timestamp pool *ConnPool[C] Conn C diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index 02d98c08be1..2ba62a93fd2 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -206,12 +206,20 @@ func (pool *ConnPool[C]) open() { // Do not allow connections to starve; if there's waiters in the queue // and connections in the stack, it means we could be starving them. // Try getting out a connection and handing it over directly - timeout := pool.IdleTimeout() - for n := 0; n < maybeStarving && pool.tryReturnAnyConn(timeout, now); n++ { + for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ { } return true }) + idleTimeout := pool.IdleTimeout() + if idleTimeout != 0 { + // The idle worker takes care of closing connections that have been idle too long + pool.runWorker(pool.close, idleTimeout/10, func(now time.Time) bool { + pool.closeIdleResources(now) + return true + }) + } + refreshInterval := pool.RefreshInterval() if refreshInterval != 0 && pool.config.refresh != nil { // The refresh worker periodically checks the refresh callback in this pool @@ -399,13 +407,13 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { return } } else { - conn.timeUsed = time.Now() + conn.timeUsed.update() lifetime := pool.extendedMaxLifetime() - if lifetime > 0 && time.Until(conn.timeCreated.Add(lifetime)) < 0 { + if lifetime > 0 && conn.timeCreated.elapsed() > lifetime { pool.Metrics.maxLifetimeClosed.Add(1) conn.Close() - if err := pool.connReopen(context.Background(), conn, conn.timeUsed); err != nil { + if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil { pool.closedConn() return } @@ -433,30 +441,25 @@ func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool { return false } -func (pool *ConnPool[C]) pop(stack *connStack[C], timeout time.Duration, now time.Time) *Pooled[C] { - conn, ok := stack.Pop() +func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] { + var conn *Pooled[C] + var ok bool - if timeout > 0 { - for ; ok; conn, ok = stack.Pop() { - if conn.timeUsed.Add(timeout).Sub(now) >= 0 { - break - } - - pool.Metrics.idleClosed.Add(1) - conn.Close() - pool.closedConn() + for conn, ok = stack.Pop(); ok; conn, ok = stack.Pop() { + if conn.timeUsed.borrow() { + break } } return conn } -func (pool *ConnPool[C]) tryReturnAnyConn(timeout time.Duration, now time.Time) bool { - if conn := pool.pop(&pool.clean, timeout, now); conn != nil { +func (pool *ConnPool[C]) tryReturnAnyConn() bool { + if conn := pool.pop(&pool.clean); conn != nil { return pool.tryReturnConn(conn) } for u := 0; u <= stackMask; u++ { - if conn := pool.pop(&pool.settings[u], timeout, now); conn != nil { + if conn := pool.pop(&pool.settings[u]); conn != nil { return pool.tryReturnConn(conn) } } @@ -488,15 +491,15 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration { return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime))) } -func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Time) error { +func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error { var err error dbconn.Conn, err = pool.config.connect(ctx) if err != nil { return err } - dbconn.timeUsed = now - dbconn.timeCreated = now + dbconn.timeUsed.set(now) + dbconn.timeCreated.set(now) return nil } @@ -505,16 +508,17 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) { if err != nil { return nil, err } - now := time.Now() - return &Pooled[C]{ - timeCreated: now, - timeUsed: now, - pool: pool, - Conn: conn, - }, nil + pooled := &Pooled[C]{ + pool: pool, + Conn: conn, + } + now := monotonicNow() + pooled.timeUsed.set(now) + pooled.timeCreated.set(now) + return pooled, nil } -func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting, timeout time.Duration, now time.Time) *Pooled[C] { +func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] { var start uint32 if setting == nil { start = uint32(pool.freshSettingsStack.Load()) @@ -524,7 +528,7 @@ func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting, timeout time.Dur for i := uint32(0); i <= stackMask; i++ { pos := (i + start) & stackMask - if conn := pool.pop(&pool.settings[pos], timeout, now); conn != nil { + if conn := pool.pop(&pool.settings[pos]); conn != nil { return conn } } @@ -555,11 +559,10 @@ func (pool *ConnPool[C]) getNew(ctx context.Context) (*Pooled[C], error) { // get returns a pooled connection with no Setting applied func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) { - timeout, now := pool.IdleTimeout(), time.Now() pool.Metrics.getCount.Add(1) // best case: if there's a connection in the clean stack, return it right away - if conn := pool.pop(&pool.clean, timeout, now); conn != nil { + if conn := pool.pop(&pool.clean); conn != nil { pool.borrowed.Add(1) return conn, nil } @@ -571,7 +574,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) { } // if we don't have capacity, try popping a connection from any of the setting stacks if conn == nil { - conn = pool.getFromSettingsStack(nil, timeout, now) + conn = pool.getFromSettingsStack(nil) } // if there are no connections in the setting stacks and we've lent out connections // to other clients, wait until one of the connections is returned @@ -595,7 +598,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) { err = conn.Conn.ResetSetting(ctx) if err != nil { conn.Close() - err = pool.connReopen(ctx, conn, time.Now()) + err = pool.connReopen(ctx, conn, monotonicNow()) if err != nil { pool.closedConn() return nil, err @@ -609,15 +612,14 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) { // getWithSetting returns a connection from the pool with the given Setting applied func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (*Pooled[C], error) { - timeout, now := pool.IdleTimeout(), time.Now() pool.Metrics.getWithSettingsCount.Add(1) var err error // best case: check if there's a connection in the setting stack where our Setting belongs - conn := pool.pop(&pool.settings[setting.bucket&stackMask], timeout, now) + conn := pool.pop(&pool.settings[setting.bucket&stackMask]) // if there's connection with our setting, try popping a clean connection if conn == nil { - conn = pool.pop(&pool.clean, timeout, now) + conn = pool.pop(&pool.clean) } // otherwise try opening a brand new connection and we'll apply the setting to it if conn == nil { @@ -629,7 +631,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) ( // try on the _other_ setting stacks, even if we have to reset the Setting for the returned // connection if conn == nil { - conn = pool.getFromSettingsStack(setting, timeout, now) + conn = pool.getFromSettingsStack(setting) } // no connections anywhere in the pool; if we've lent out connections to other clients // wait for one of them @@ -656,7 +658,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) ( err = conn.Conn.ResetSetting(ctx) if err != nil { conn.Close() - err = pool.connReopen(ctx, conn, time.Now()) + err = pool.connReopen(ctx, conn, monotonicNow()) if err != nil { pool.closedConn() return nil, err @@ -719,9 +721,9 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error { } // try closing from connections which are currently idle in the stacks - conn := pool.getFromSettingsStack(nil, 0, time.Time{}) + conn := pool.getFromSettingsStack(nil) if conn == nil { - conn, _ = pool.clean.Pop() + conn = pool.pop(&pool.clean) } if conn == nil { time.Sleep(delay) @@ -734,6 +736,33 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error { return nil } +func (pool *ConnPool[C]) closeIdleResources(now time.Time) { + timeout := pool.IdleTimeout() + if timeout == 0 { + return + } + if pool.Capacity() == 0 { + return + } + + mono := monotonicFromTime(now) + + closeInStack := func(s *connStack[C]) { + for conn := s.Peek(); conn != nil; conn = conn.next.Load() { + if conn.timeUsed.expired(mono, timeout) { + pool.Metrics.idleClosed.Add(1) + conn.Close() + pool.closedConn() + } + } + } + + for i := 0; i <= stackMask; i++ { + closeInStack(&pool.settings[i]) + } + closeInStack(&pool.clean) +} + func (pool *ConnPool[C]) StatsJSON() map[string]any { return map[string]any{ "Capacity": int(pool.Capacity()), diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 5d029c16e6f..202261e6f3b 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -619,17 +619,8 @@ func TestIdleTimeout(t *testing.T) { p.put(conn) } - // wait again; the connections will timeout inside the pool, but we'll only notice - // when trying to pop them again time.Sleep(1 * time.Second) - for i := 0; i < 5; i++ { - r, err := p.Get(ctx, setting) - require.NoError(t, err) - - p.put(r) - } - for _, closed := range closers { select { case <-closed: diff --git a/go/pools/smartconnpool/stack.go b/go/pools/smartconnpool/stack.go index ea7ae50201e..cdf232b11e2 100644 --- a/go/pools/smartconnpool/stack.go +++ b/go/pools/smartconnpool/stack.go @@ -54,6 +54,11 @@ func (s *connStack[C]) Pop() (*Pooled[C], bool) { } } +func (s *connStack[C]) Peek() *Pooled[C] { + top, _ := s.top.Load() + return top +} + func (s *connStack[C]) PopAll(out []*Pooled[C]) []*Pooled[C] { var oldHead *Pooled[C] diff --git a/go/pools/smartconnpool/timestamp.go b/go/pools/smartconnpool/timestamp.go new file mode 100644 index 00000000000..6654ac35269 --- /dev/null +++ b/go/pools/smartconnpool/timestamp.go @@ -0,0 +1,66 @@ +package smartconnpool + +import ( + "math" + "sync/atomic" + "time" +) + +var monotonicRoot = time.Now() + +type timestamp struct { + nano atomic.Int64 +} + +const timestampExpired = math.MaxInt64 +const timestampBusy = math.MinInt64 + +func (t *timestamp) update() { + t.nano.Store(int64(monotonicNow())) +} + +func monotonicNow() time.Duration { + return time.Since(monotonicRoot) +} + +func monotonicFromTime(now time.Time) time.Duration { + return now.Sub(monotonicRoot) +} + +func (t *timestamp) set(mono time.Duration) { + t.nano.Store(int64(mono)) +} + +func (t *timestamp) get() time.Duration { + return time.Duration(t.nano.Load()) +} + +func (t *timestamp) elapsed() time.Duration { + return monotonicNow() - t.get() +} + +func (t *timestamp) borrow() bool { + stamp := t.nano.Load() + switch stamp { + case timestampExpired: + return false + case timestampBusy: + panic("timestampBusy when borrowing a time") + default: + return t.nano.CompareAndSwap(stamp, timestampBusy) + } +} + +func (t *timestamp) expired(now time.Duration, timeout time.Duration) bool { + stamp := t.nano.Load() + if stamp == timestampExpired { + return false + } + if stamp == timestampBusy { + return false + } + if now-time.Duration(stamp) > timeout { + return t.nano.CompareAndSwap(stamp, timestampExpired) + } + return false +}