Skip to content

Commit

Permalink
pool: close connections on the worker
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <vmg@strn.cat>
  • Loading branch information
vmg committed Feb 12, 2025
1 parent d9246a7 commit e3e08b4
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 54 deletions.
5 changes: 2 additions & 3 deletions go/pools/smartconnpool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package smartconnpool
import (
"context"
"sync/atomic"
"time"
)

type Connection interface {
Expand All @@ -33,8 +32,8 @@ type Connection interface {

type Pooled[C Connection] struct {
next atomic.Pointer[Pooled[C]]
timeCreated time.Time
timeUsed time.Time
timeCreated timestamp
timeUsed timestamp
pool *ConnPool[C]

Conn C
Expand Down
113 changes: 71 additions & 42 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,20 @@ func (pool *ConnPool[C]) open() {
// Do not allow connections to starve; if there's waiters in the queue
// and connections in the stack, it means we could be starving them.
// Try getting out a connection and handing it over directly
timeout := pool.IdleTimeout()
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(timeout, now); n++ {
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
}
return true
})

idleTimeout := pool.IdleTimeout()
if idleTimeout != 0 {
// The idle worker takes care of closing connections that have been idle too long
pool.runWorker(pool.close, idleTimeout/10, func(now time.Time) bool {
pool.closeIdleResources(now)
return true
})
}

refreshInterval := pool.RefreshInterval()
if refreshInterval != 0 && pool.config.refresh != nil {
// The refresh worker periodically checks the refresh callback in this pool
Expand Down Expand Up @@ -399,13 +407,13 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
return
}
} else {
conn.timeUsed = time.Now()
conn.timeUsed.update()

lifetime := pool.extendedMaxLifetime()
if lifetime > 0 && time.Until(conn.timeCreated.Add(lifetime)) < 0 {
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
pool.Metrics.maxLifetimeClosed.Add(1)
conn.Close()
if err := pool.connReopen(context.Background(), conn, conn.timeUsed); err != nil {
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
pool.closedConn()
return
}
Expand Down Expand Up @@ -433,30 +441,25 @@ func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
return false
}

func (pool *ConnPool[C]) pop(stack *connStack[C], timeout time.Duration, now time.Time) *Pooled[C] {
conn, ok := stack.Pop()
func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] {
var conn *Pooled[C]
var ok bool

if timeout > 0 {
for ; ok; conn, ok = stack.Pop() {
if conn.timeUsed.Add(timeout).Sub(now) >= 0 {
break
}

pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
for conn, ok = stack.Pop(); ok; conn, ok = stack.Pop() {
if conn.timeUsed.borrow() {
break
}
}

return conn
}

func (pool *ConnPool[C]) tryReturnAnyConn(timeout time.Duration, now time.Time) bool {
if conn := pool.pop(&pool.clean, timeout, now); conn != nil {
func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn := pool.pop(&pool.clean); conn != nil {
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn := pool.pop(&pool.settings[u], timeout, now); conn != nil {
if conn := pool.pop(&pool.settings[u]); conn != nil {
return pool.tryReturnConn(conn)
}
}
Expand Down Expand Up @@ -488,15 +491,15 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime)))
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Time) error {
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error {
var err error
dbconn.Conn, err = pool.config.connect(ctx)
if err != nil {
return err
}

dbconn.timeUsed = now
dbconn.timeCreated = now
dbconn.timeUsed.set(now)
dbconn.timeCreated.set(now)
return nil
}

Expand All @@ -505,16 +508,17 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
if err != nil {
return nil, err
}
now := time.Now()
return &Pooled[C]{
timeCreated: now,
timeUsed: now,
pool: pool,
Conn: conn,
}, nil
pooled := &Pooled[C]{
pool: pool,
Conn: conn,
}
now := monotonicNow()
pooled.timeUsed.set(now)
pooled.timeCreated.set(now)
return pooled, nil
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting, timeout time.Duration, now time.Time) *Pooled[C] {
func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
var start uint32
if setting == nil {
start = uint32(pool.freshSettingsStack.Load())
Expand All @@ -524,7 +528,7 @@ func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting, timeout time.Dur

for i := uint32(0); i <= stackMask; i++ {
pos := (i + start) & stackMask
if conn := pool.pop(&pool.settings[pos], timeout, now); conn != nil {
if conn := pool.pop(&pool.settings[pos]); conn != nil {
return conn
}
}
Expand Down Expand Up @@ -555,11 +559,10 @@ func (pool *ConnPool[C]) getNew(ctx context.Context) (*Pooled[C], error) {

// get returns a pooled connection with no Setting applied
func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
timeout, now := pool.IdleTimeout(), time.Now()
pool.Metrics.getCount.Add(1)

// best case: if there's a connection in the clean stack, return it right away
if conn := pool.pop(&pool.clean, timeout, now); conn != nil {
if conn := pool.pop(&pool.clean); conn != nil {
pool.borrowed.Add(1)
return conn, nil
}
Expand All @@ -571,7 +574,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
}
// if we don't have capacity, try popping a connection from any of the setting stacks
if conn == nil {
conn = pool.getFromSettingsStack(nil, timeout, now)
conn = pool.getFromSettingsStack(nil)
}
// if there are no connections in the setting stacks and we've lent out connections
// to other clients, wait until one of the connections is returned
Expand All @@ -595,7 +598,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
err = conn.Conn.ResetSetting(ctx)
if err != nil {
conn.Close()
err = pool.connReopen(ctx, conn, time.Now())
err = pool.connReopen(ctx, conn, monotonicNow())
if err != nil {
pool.closedConn()
return nil, err
Expand All @@ -609,15 +612,14 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {

// getWithSetting returns a connection from the pool with the given Setting applied
func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (*Pooled[C], error) {
timeout, now := pool.IdleTimeout(), time.Now()
pool.Metrics.getWithSettingsCount.Add(1)

var err error
// best case: check if there's a connection in the setting stack where our Setting belongs
conn := pool.pop(&pool.settings[setting.bucket&stackMask], timeout, now)
conn := pool.pop(&pool.settings[setting.bucket&stackMask])
// if there's connection with our setting, try popping a clean connection
if conn == nil {
conn = pool.pop(&pool.clean, timeout, now)
conn = pool.pop(&pool.clean)
}
// otherwise try opening a brand new connection and we'll apply the setting to it
if conn == nil {
Expand All @@ -629,7 +631,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
// try on the _other_ setting stacks, even if we have to reset the Setting for the returned
// connection
if conn == nil {
conn = pool.getFromSettingsStack(setting, timeout, now)
conn = pool.getFromSettingsStack(setting)
}
// no connections anywhere in the pool; if we've lent out connections to other clients
// wait for one of them
Expand All @@ -656,7 +658,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
err = conn.Conn.ResetSetting(ctx)
if err != nil {
conn.Close()
err = pool.connReopen(ctx, conn, time.Now())
err = pool.connReopen(ctx, conn, monotonicNow())
if err != nil {
pool.closedConn()
return nil, err
Expand Down Expand Up @@ -719,9 +721,9 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
}

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil, 0, time.Time{})
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn, _ = pool.clean.Pop()
conn = pool.pop(&pool.clean)
}
if conn == nil {
time.Sleep(delay)
Expand All @@ -734,6 +736,33 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
return nil
}

func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
timeout := pool.IdleTimeout()
if timeout == 0 {
return
}
if pool.Capacity() == 0 {
return
}

mono := monotonicFromTime(now)

closeInStack := func(s *connStack[C]) {
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
}
}
}

for i := 0; i <= stackMask; i++ {
closeInStack(&pool.settings[i])
}
closeInStack(&pool.clean)
}

func (pool *ConnPool[C]) StatsJSON() map[string]any {
return map[string]any{
"Capacity": int(pool.Capacity()),
Expand Down
9 changes: 0 additions & 9 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,17 +619,8 @@ func TestIdleTimeout(t *testing.T) {
p.put(conn)
}

// wait again; the connections will timeout inside the pool, but we'll only notice
// when trying to pop them again
time.Sleep(1 * time.Second)

for i := 0; i < 5; i++ {
r, err := p.Get(ctx, setting)
require.NoError(t, err)

p.put(r)
}

for _, closed := range closers {
select {
case <-closed:
Expand Down
5 changes: 5 additions & 0 deletions go/pools/smartconnpool/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (s *connStack[C]) Pop() (*Pooled[C], bool) {
}
}

func (s *connStack[C]) Peek() *Pooled[C] {
top, _ := s.top.Load()
return top
}

func (s *connStack[C]) PopAll(out []*Pooled[C]) []*Pooled[C] {
var oldHead *Pooled[C]

Expand Down
66 changes: 66 additions & 0 deletions go/pools/smartconnpool/timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package smartconnpool

import (
"math"
"sync/atomic"
"time"
)

var monotonicRoot = time.Now()

type timestamp struct {
nano atomic.Int64
}

const timestampExpired = math.MaxInt64
const timestampBusy = math.MinInt64

func (t *timestamp) update() {
t.nano.Store(int64(monotonicNow()))
}

func monotonicNow() time.Duration {
return time.Since(monotonicRoot)
}

func monotonicFromTime(now time.Time) time.Duration {
return now.Sub(monotonicRoot)
}

func (t *timestamp) set(mono time.Duration) {
t.nano.Store(int64(mono))
}

func (t *timestamp) get() time.Duration {
return time.Duration(t.nano.Load())
}

func (t *timestamp) elapsed() time.Duration {
return monotonicNow() - t.get()
}

func (t *timestamp) borrow() bool {
stamp := t.nano.Load()
switch stamp {
case timestampExpired:
return false
case timestampBusy:
panic("timestampBusy when borrowing a time")
default:
return t.nano.CompareAndSwap(stamp, timestampBusy)
}
}

func (t *timestamp) expired(now time.Duration, timeout time.Duration) bool {
stamp := t.nano.Load()
if stamp == timestampExpired {
return false
}
if stamp == timestampBusy {
return false
}
if now-time.Duration(stamp) > timeout {
return t.nano.CompareAndSwap(stamp, timestampExpired)
}
return false
}

0 comments on commit e3e08b4

Please sign in to comment.