Skip to content

Commit

Permalink
Close on pop
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <vmg@strn.cat>
  • Loading branch information
vmg committed Feb 12, 2025
1 parent c47f1bd commit d9246a7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 57 deletions.
90 changes: 34 additions & 56 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package smartconnpool
import (
"context"
"math/rand/v2"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -201,26 +200,18 @@ func (pool *ConnPool[C]) open() {

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool {
pool.runWorker(pool.close, 100*time.Millisecond, func(now time.Time) bool {
maybeStarving := pool.wait.expire(false)

// Do not allow connections to starve; if there's waiters in the queue
// and connections in the stack, it means we could be starving them.
// Try getting out a connection and handing it over directly
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
timeout := pool.IdleTimeout()
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(timeout, now); n++ {
}
return true
})

idleTimeout := pool.IdleTimeout()
if idleTimeout != 0 {
// The idle worker takes care of closing connections that have been idle too long
pool.runWorker(pool.close, idleTimeout/10, func(now time.Time) bool {
pool.closeIdleResources(now)
return true
})
}

refreshInterval := pool.RefreshInterval()
if refreshInterval != 0 && pool.config.refresh != nil {
// The refresh worker periodically checks the refresh callback in this pool
Expand Down Expand Up @@ -442,12 +433,30 @@ func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
return false
}

func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn, ok := pool.clean.Pop(); ok {
func (pool *ConnPool[C]) pop(stack *connStack[C], timeout time.Duration, now time.Time) *Pooled[C] {
conn, ok := stack.Pop()

if timeout > 0 {
for ; ok; conn, ok = stack.Pop() {
if conn.timeUsed.Add(timeout).Sub(now) >= 0 {
break
}

pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
}
}

return conn
}

func (pool *ConnPool[C]) tryReturnAnyConn(timeout time.Duration, now time.Time) bool {
if conn := pool.pop(&pool.clean, timeout, now); conn != nil {
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn, ok := pool.settings[u].Pop(); ok {
if conn := pool.pop(&pool.settings[u], timeout, now); conn != nil {
return pool.tryReturnConn(conn)
}
}
Expand Down Expand Up @@ -505,7 +514,7 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
}, nil
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting, timeout time.Duration, now time.Time) *Pooled[C] {
var start uint32
if setting == nil {
start = uint32(pool.freshSettingsStack.Load())
Expand All @@ -515,7 +524,7 @@ func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {

for i := uint32(0); i <= stackMask; i++ {
pos := (i + start) & stackMask
if conn, ok := pool.settings[pos].Pop(); ok {
if conn := pool.pop(&pool.settings[pos], timeout, now); conn != nil {
return conn
}
}
Expand Down Expand Up @@ -546,10 +555,11 @@ func (pool *ConnPool[C]) getNew(ctx context.Context) (*Pooled[C], error) {

// get returns a pooled connection with no Setting applied
func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
timeout, now := pool.IdleTimeout(), time.Now()
pool.Metrics.getCount.Add(1)

// best case: if there's a connection in the clean stack, return it right away
if conn, ok := pool.clean.Pop(); ok {
if conn := pool.pop(&pool.clean, timeout, now); conn != nil {
pool.borrowed.Add(1)
return conn, nil
}
Expand All @@ -561,7 +571,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
}
// if we don't have capacity, try popping a connection from any of the setting stacks
if conn == nil {
conn = pool.getFromSettingsStack(nil)
conn = pool.getFromSettingsStack(nil, timeout, now)
}
// if there are no connections in the setting stacks and we've lent out connections
// to other clients, wait until one of the connections is returned
Expand Down Expand Up @@ -599,14 +609,15 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {

// getWithSetting returns a connection from the pool with the given Setting applied
func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (*Pooled[C], error) {
timeout, now := pool.IdleTimeout(), time.Now()
pool.Metrics.getWithSettingsCount.Add(1)

var err error
// best case: check if there's a connection in the setting stack where our Setting belongs
conn, _ := pool.settings[setting.bucket&stackMask].Pop()
conn := pool.pop(&pool.settings[setting.bucket&stackMask], timeout, now)
// if there's connection with our setting, try popping a clean connection
if conn == nil {
conn, _ = pool.clean.Pop()
conn = pool.pop(&pool.clean, timeout, now)
}
// otherwise try opening a brand new connection and we'll apply the setting to it
if conn == nil {
Expand All @@ -618,7 +629,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
// try on the _other_ setting stacks, even if we have to reset the Setting for the returned
// connection
if conn == nil {
conn = pool.getFromSettingsStack(setting)
conn = pool.getFromSettingsStack(setting, timeout, now)
}
// no connections anywhere in the pool; if we've lent out connections to other clients
// wait for one of them
Expand Down Expand Up @@ -708,7 +719,7 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
}

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
conn := pool.getFromSettingsStack(nil, 0, time.Time{})
if conn == nil {
conn, _ = pool.clean.Pop()
}
Expand All @@ -723,39 +734,6 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
return nil
}

func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
timeout := pool.IdleTimeout()
if timeout == 0 {
return
}
if pool.Capacity() == 0 {
return
}

var conns []*Pooled[C]

closeInStack := func(s *connStack[C]) {
conns = s.PopAll(conns[:0])
slices.Reverse(conns)

for _, conn := range conns {
if conn.timeUsed.Add(timeout).Sub(now) < 0 {
pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
continue
}

s.Push(conn)
}
}

for i := 0; i <= stackMask; i++ {
closeInStack(&pool.settings[i])
}
closeInStack(&pool.clean)
}

func (pool *ConnPool[C]) StatsJSON() map[string]any {
return map[string]any{
"Capacity": int(pool.Capacity()),
Expand Down
17 changes: 16 additions & 1 deletion go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,23 @@ func TestIdleTimeout(t *testing.T) {
p.put(conn)
}

// wait again; the connections will timeout inside the pool, but we'll only notice
// when trying to pop them again
time.Sleep(1 * time.Second)

for i := 0; i < 5; i++ {
r, err := p.Get(ctx, setting)
require.NoError(t, err)

p.put(r)
}

for _, closed := range closers {
<-closed
select {
case <-closed:
default:
t.Fatalf("Connections remain open after 1 second")
}
}

// no need to assert anything: all the connections in the pool should are idle-closed
Expand Down

0 comments on commit d9246a7

Please sign in to comment.