Skip to content

Commit

Permalink
Merge branch 'slack-19.0' into slack-19.0-vstream-patch
Browse files Browse the repository at this point in the history
  • Loading branch information
makinje16 authored Feb 19, 2025
2 parents 14e5707 + 3d13cf0 commit a1fcf9b
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 72 deletions.
5 changes: 2 additions & 3 deletions go/pools/smartconnpool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package smartconnpool
import (
"context"
"sync/atomic"
"time"
)

type Connection interface {
Expand All @@ -33,8 +32,8 @@ type Connection interface {

type Pooled[C Connection] struct {
next atomic.Pointer[Pooled[C]]
timeCreated time.Time
timeUsed time.Time
timeCreated timestamp
timeUsed timestamp
pool *ConnPool[C]

Conn C
Expand Down
131 changes: 84 additions & 47 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package smartconnpool

import (
"context"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -156,7 +155,6 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.freshSettingsStack.Store(-1)
pool.config.maxCapacity = config.Capacity
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
Expand Down Expand Up @@ -195,8 +193,14 @@ func (pool *ConnPool[C]) open() {

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
pool.wait.expire(false)
pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool {
maybeStarving := pool.wait.expire(false)

// Do not allow connections to starve; if there's waiters in the queue
// and connections in the stack, it means we could be starving them.
// Try getting out a connection and handing it over directly
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
}
return true
})

Expand Down Expand Up @@ -382,29 +386,65 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
return
}
} else {
conn.timeUsed = time.Now()
conn.timeUsed.update()

lifetime := pool.extendedMaxLifetime()
if lifetime > 0 && time.Until(conn.timeCreated.Add(lifetime)) < 0 {
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
pool.Metrics.maxLifetimeClosed.Add(1)
conn.Close()
if err := pool.connReopen(context.Background(), conn, conn.timeUsed); err != nil {
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
pool.closedConn()
return
}
}
}

if !pool.wait.tryReturnConn(conn) {
connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
pool.tryReturnConn(conn)
}

func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
if pool.wait.tryReturnConn(conn) {
return true
}
connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
}
return false
}

func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] {
// retry-loop: pop a connection from the stack and atomically check whether
// its timeout has elapsed. If the timeout has elapsed, the borrow will fail,
// which means that a background worker has already marked this connection
// as stale and is in the process of shutting it down. If we successfully mark
// the timeout as borrowed, we know that background workers will not be able
// to expire this connection (even if it's still visible to them), so it's
// safe to return it
for conn, ok := stack.Pop(); ok; conn, ok = stack.Pop() {
if conn.timeUsed.borrow() {
return conn
}
}
return nil
}

func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn := pool.pop(&pool.clean); conn != nil {
conn.timeUsed.update()
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn := pool.pop(&pool.settings[u]); conn != nil {
conn.timeUsed.update()
return pool.tryReturnConn(conn)
}
}
return false
}

func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
Expand All @@ -416,15 +456,15 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
return time.Duration(maxLifetime) + time.Duration(extended)
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Time) error {
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error {
var err error
dbconn.Conn, err = pool.config.connect(ctx)
if err != nil {
return err
}

dbconn.timeUsed = now
dbconn.timeCreated = now
dbconn.timeUsed.set(now)
dbconn.timeCreated.set(now)
return nil
}

Expand All @@ -433,31 +473,27 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
if err != nil {
return nil, err
}
now := time.Now()
return &Pooled[C]{
timeCreated: now,
timeUsed: now,
pool: pool,
Conn: conn,
}, nil
pooled := &Pooled[C]{
pool: pool,
Conn: conn,
}
now := monotonicNow()
pooled.timeUsed.set(now)
pooled.timeCreated.set(now)
return pooled, nil
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
fresh := pool.freshSettingsStack.Load()
if fresh < 0 {
return nil
}

var start uint32
if setting == nil {
start = uint32(fresh)
start = uint32(pool.freshSettingsStack.Load())
} else {
start = setting.bucket
}

for i := uint32(0); i <= stackMask; i++ {
pos := (i + start) & stackMask
if conn, ok := pool.settings[pos].Pop(); ok {
if conn := pool.pop(&pool.settings[pos]); conn != nil {
return conn
}
}
Expand Down Expand Up @@ -491,7 +527,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
pool.Metrics.getCount.Add(1)

// best case: if there's a connection in the clean stack, return it right away
if conn, ok := pool.clean.Pop(); ok {
if conn := pool.pop(&pool.clean); conn != nil {
pool.borrowed.Add(1)
return conn, nil
}
Expand Down Expand Up @@ -527,7 +563,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
err = conn.Conn.ResetSetting(ctx)
if err != nil {
conn.Close()
err = pool.connReopen(ctx, conn, time.Now())
err = pool.connReopen(ctx, conn, monotonicNow())
if err != nil {
pool.closedConn()
return nil, err
Expand All @@ -545,10 +581,10 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (

var err error
// best case: check if there's a connection in the setting stack where our Setting belongs
conn, _ := pool.settings[setting.bucket&stackMask].Pop()
conn := pool.pop(&pool.settings[setting.bucket&stackMask])
// if there's connection with our setting, try popping a clean connection
if conn == nil {
conn, _ = pool.clean.Pop()
conn = pool.pop(&pool.clean)
}
// otherwise try opening a brand new connection and we'll apply the setting to it
if conn == nil {
Expand Down Expand Up @@ -587,7 +623,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
err = conn.Conn.ResetSetting(ctx)
if err != nil {
conn.Close()
err = pool.connReopen(ctx, conn, time.Now())
err = pool.connReopen(ctx, conn, monotonicNow())
if err != nil {
pool.closedConn()
return nil, err
Expand Down Expand Up @@ -649,7 +685,7 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn, _ = pool.clean.Pop()
conn = pool.pop(&pool.clean)
}
if conn == nil {
time.Sleep(delay)
Expand All @@ -671,21 +707,22 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
return
}

var conns []*Pooled[C]
mono := monotonicFromTime(now)

closeInStack := func(s *connStack[C]) {
conns = s.PopAll(conns[:0])
slices.Reverse(conns)

for _, conn := range conns {
if conn.timeUsed.Add(timeout).Sub(now) < 0 {
// Do a read-only best effort iteration of all the connection in this
// stack and atomically attempt to mark them as expired.
// Any connections that are marked as expired are _not_ removed from
// the stack; it's generally unsafe to remove nodes from the stack
// besides the head. When clients pop from the stack, they'll immediately
// notice the expired connection and ignore it.
// see: timestamp.expired
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
continue
}

s.Push(conn)
}
}

Expand Down
77 changes: 76 additions & 1 deletion go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -36,6 +37,7 @@ var (
type TestState struct {
lastID, open, close, reset atomic.Int64
waits []time.Time
mu sync.Mutex

chaos struct {
delayConnect time.Duration
Expand All @@ -45,6 +47,8 @@ type TestState struct {
}

func (ts *TestState) LogWait(start time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.waits = append(ts.waits, start)
}

Expand Down Expand Up @@ -615,8 +619,14 @@ func TestIdleTimeout(t *testing.T) {
p.put(conn)
}

time.Sleep(1 * time.Second)

for _, closed := range closers {
<-closed
select {
case <-closed:
default:
t.Fatalf("Connections remain open after 1 second")
}
}

// no need to assert anything: all the connections in the pool should are idle-closed
Expand Down Expand Up @@ -1080,3 +1090,68 @@ func TestApplySettingsFailure(t *testing.T) {
p.put(r)
}
}

func TestGetSpike(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [10]*Pooled[*TestConn]

// Ensure we have a pool with 5 available resources
for i := 0; i < 5; i++ {
r, err := p.Get(ctx, nil)

require.NoError(t, err)
resources[i] = r
assert.EqualValues(t, 5-i-1, p.Available())
assert.Zero(t, p.Metrics.WaitCount())
assert.Zero(t, len(state.waits))
assert.Zero(t, p.Metrics.WaitTime())
assert.EqualValues(t, i+1, state.lastID.Load())
assert.EqualValues(t, i+1, state.open.Load())
}

for i := 0; i < 5; i++ {
p.put(resources[i])
}

assert.EqualValues(t, 5, p.Available())
assert.EqualValues(t, 5, p.Active())
assert.EqualValues(t, 0, p.InUse())

for i := 0; i < 2000; i++ {
wg := sync.WaitGroup{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

errs := make(chan error, 80)

for j := 0; j < 80; j++ {
wg.Add(1)

go func() {
defer wg.Done()
r, err := p.Get(ctx, nil)
defer p.put(r)

if err != nil {
errs <- err
}
}()
}
wg.Wait()

if len(errs) > 0 {
t.Errorf("Error getting connection: %v", <-errs)
}

close(errs)
}
}
Loading

0 comments on commit a1fcf9b

Please sign in to comment.