Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 63 additions & 24 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,35 +738,74 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
mono := monotonicFromTime(now)

closeInStack := func(s *connStack[C]) {
// Do a read-only best effort iteration of all the connection in this
// stack and atomically attempt to mark them as expired.
// Any connections that are marked as expired are _not_ removed from
// the stack; it's generally unsafe to remove nodes from the stack
// besides the head. When clients pop from the stack, they'll immediately
// notice the expired connection and ignore it.
// see: timestamp.expired
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn, ok := s.Pop()
if !ok {
// Early return to skip allocating slices when the stack is empty
return
}

conn.Close()
pool.closedConn()
activeConnections := pool.Active()

// Only expire up to ~half of the active connections at a time. This should
// prevent us from closing too many connections in one go which could lead to
// a lot of `.Get` calls being added to the waitlist if there's a sudden spike
// coming in _after_ connections were popped off the stack but _before_ being
// returned back to the pool. This is unlikely to happen, but better safe than sorry.
//
// We always expire at least one connection per stack per iteration to ensure
// that idle connections are eventually closed even in small pools.
//
// We will expire any additional connections in the next iteration of the idle closer.
expiredConnections := make([]*Pooled[C], 0, max(activeConnections/2, 1))
validConnections := make([]*Pooled[C], 0, activeConnections)

// Pop out connections from the stack until we get a `nil` connection
for ok {
if conn.timeUsed.expired(mono, timeout) {
expiredConnections = append(expiredConnections, conn)

// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db-connect-timeout-ms` config param.
c, err := pool.getNew(context.Background())
if err != nil {
// If we couldn't open a new connection, just continue
continue
if len(expiredConnections) == cap(expiredConnections) {
// We have collected enough connections for this iteration to expire
break
}
} else {
validConnections = append(validConnections, conn)
}

// opening a new connection might have raced with other goroutines,
// so it's possible that we got back `nil` here
if c != nil {
// Return the new connection to the pool
pool.tryReturnConn(c)
}
conn, ok = s.Pop()
}

// Return all the valid connections back to waiters or the stack
//
// The order here is not important - because we can't guarantee to
// restore the order we got the connections out of the stack anyway.
//
// If we return the connections in the order popped off the stack:
// * waiters will get the newest connection first
// * stack will have the oldest connections at the top of the stack.
//
// If we return the connections in reverse order:
// * waiters will get the oldest connection first
// * stack will have the newest connections at the top of the stack.
//
// Neither of these is better or worse than the other.
for _, conn := range validConnections {
pool.tryReturnConn(conn)
}

// Close all the expired connections and open new ones to replace them
for _, conn := range expiredConnections {
pool.Metrics.idleClosed.Add(1)

conn.Close()

err := pool.connReopen(context.Background(), conn, mono)
if err != nil {
pool.closedConn()
continue
}

pool.tryReturnConn(conn)
}
}

Expand Down
97 changes: 92 additions & 5 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,16 +1325,24 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) {

// Try to get connections while they're being reopened
// This should trigger the bug where connections get discarded
wg := sync.WaitGroup{}

for i := 0; i < 2; i++ {
getCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
getCtx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()

conn, err := p.Get(getCtx, nil)
require.NoError(t, err)
conn, err := p.Get(getCtx, nil)
require.NoError(t, err)

p.put(conn)
p.put(conn)
}()
}

wg.Wait()

// Wait a moment for all reopening to complete
require.EventuallyWithT(t, func(c *assert.CollectT) {
// Check the actual number of currently open connections
Expand Down Expand Up @@ -1365,3 +1373,82 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) {
assert.Equal(t, int64(0), state.open.Load())
assert.Equal(t, int64(4), state.close.Load())
}

func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 10,
IdleTimeout: 50 * time.Millisecond,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

defer p.Close()

var conns []*Pooled[*TestConn]
for i := 0; i < 10; i++ {
conn, err := p.Get(ctx, nil)
require.NoError(t, err)
conns = append(conns, conn)
}

for _, conn := range conns {
p.put(conn)
}

require.EqualValues(t, 10, p.Active())
require.EqualValues(t, 10, p.Available())

// Wait a bit for the idle timeout worker to refresh connections
assert.Eventually(t, func() bool {
return p.Metrics.IdleClosed() > 10
}, 500*time.Millisecond, 10*time.Millisecond, "Expected at least 10 connections to be closed by idle timeout")

// Verify that new connections were created to replace the closed ones
require.EqualValues(t, 10, p.Active())
require.EqualValues(t, 10, p.Available())

// Count how many connections in the stack are closed
totalInStack := 0
for conn := p.clean.Peek(); conn != nil; conn = conn.next.Load() {
totalInStack++
}

require.LessOrEqual(t, totalInStack, 10)
}

func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing.B) {
var state TestState

capacity := 1000

p := NewPool(&Config[*TestConn]{
Capacity: int64(capacity),
IdleTimeout: 30 * time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)
defer p.Close()

// Fill the pool
connections := make([]*Pooled[*TestConn], 0, capacity)
for range capacity {
conn, err := p.Get(context.Background(), nil)
if err != nil {
b.Fatal(err)
}

connections = append(connections, conn)
}

// Return all connections to the pool
for _, conn := range connections {
conn.Recycle()
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
p.closeIdleResources(time.Now())
}
}
Loading