Skip to content

Commit

Permalink
Fix crash in PoolStateMachine+ConnectionGroup when closing connection…
Browse files Browse the repository at this point in the history
… while keepAlive is running (#444)

Fixes #443.

Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>
Co-authored-by: Fabian Fett <fabianfett@apple.com>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent 54f491c commit e60e495
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 6 deletions.
16 changes: 11 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- swift:5.8-jammy
- swift:5.9-jammy
- swiftlang/swift:nightly-5.10-jammy
#- swiftlang/swift:nightly-main-jammy
- swiftlang/swift:nightly-main-jammy
include:
- swift-image: swift:5.9-jammy
code-coverage: true
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
matrix:
postgres-formula:
# Only test one version on macOS, let Linux do the rest
- postgresql@15
- postgresql@16
postgres-auth:
# Only test one auth method on macOS, Linux tests will cover the others
- scram-sha-256
Expand All @@ -157,10 +157,16 @@ jobs:
- name: Install Postgres, setup DB and auth, and wait for server start
run: |
export PATH="$(brew --prefix)/opt/${POSTGRES_FORMULA}/bin:$PATH" PGDATA=/tmp/vapor-postgres-test
(brew unlink postgresql || true) && brew install "${POSTGRES_FORMULA}" && brew link --force "${POSTGRES_FORMULA}"
# ** BEGIN ** Work around bug in both Homebrew and GHA
(brew upgrade python@3.11 || true) && (brew link --force --overwrite python@3.11 || true)
(brew upgrade python@3.12 || true) && (brew link --force --overwrite python@3.12 || true)
brew upgrade
# ** END ** Work around bug in both Homebrew and GHA
brew install --overwrite "${POSTGRES_FORMULA}"
brew link --overwrite --force "${POSTGRES_FORMULA}"
initdb --locale=C --auth-host "${POSTGRES_AUTH_METHOD}" -U "${POSTGRES_USER}" --pwfile=<(echo "${POSTGRES_PASSWORD}")
pg_ctl start --wait
timeout-minutes: 2
timeout-minutes: 15
- name: Checkout code
uses: actions/checkout@v4
- name: Run all tests
Expand All @@ -183,7 +189,7 @@ jobs:
gh-codeql:
runs-on: ubuntu-latest
container: swift:5.8-jammy # CodeQL currently broken with 5.9
container: swift:5.9-jammy
permissions: { actions: write, contents: read, security-events: write }
steps:
- name: Check out code
Expand Down
2 changes: 1 addition & 1 deletion Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public final class ConnectionPool<
self.observabilityDelegate.keepAliveFailed(id: connection.id, error: error)

self.modifyStateAndRunActions { state in
state.stateMachine.connectionClosed(connection)
state.stateMachine.connectionKeepAliveFailed(connection.id)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,30 @@ extension PoolStateMachine {
return (index, context)
}

@inlinable
mutating func keepAliveFailed(_ connectionID: Connection.ID) -> CloseAction? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
// Connection has already been closed
return nil
}

guard let closeAction = self.connections[index].keepAliveFailed() else {
return nil
}

self.stats.idle -= 1
self.stats.closing += 1
self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0
self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams

// force unwrapping the connection is fine, because a close action due to failed
// keepAlive cannot happen without a connection
return CloseAction(
connection: closeAction.connection!,
timersToCancel: closeAction.cancelTimers
)
}

// MARK: Connection close/removal

@usableFromInline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ extension PoolStateMachine {
}
}

@inlinable
mutating func keepAliveFailed() -> CloseAction? {
return self.close()
}

@inlinable
mutating func timerScheduled(
_ timer: ConnectionTimer,
Expand Down
9 changes: 9 additions & 0 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ struct PoolStateMachine<
return self.handleAvailableConnection(index: index, availableContext: context)
}

@inlinable
mutating func connectionKeepAliveFailed(_ connectionID: ConnectionID) -> Action {
guard let closeAction = self.connections.keepAliveFailed(connectionID) else {
return .none()
}

return .init(request: .none, connection: .closeConnection(closeAction.connection, closeAction.timersToCancel))
}

@inlinable
mutating func connectionIdleTimerTriggered(_ connectionID: ConnectionID) -> Action {
precondition(self.requestQueue.isEmpty)
Expand Down
86 changes: 86 additions & 0 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,92 @@ final class ConnectionPoolTests: XCTestCase {
}
}

func testKeepAliveOnClose() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(20)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

async let lease1ConnectionAsync = pool.leaseConnection()

let connection = await factory.nextConnectAttempt { connectionID in
return 1
}

let lease1Connection = try await lease1ConnectionAsync
XCTAssert(connection === lease1Connection)

pool.releaseConnection(lease1Connection)

// keep alive 1

// validate that a keep alive timer and an idle timeout timer is scheduled
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
let deadline1 = await clock.nextTimerScheduled()
print(deadline1)
XCTAssertNotNil(expectedInstants.remove(deadline1))
let deadline2 = await clock.nextTimerScheduled()
print(deadline2)
XCTAssertNotNil(expectedInstants.remove(deadline2))
XCTAssert(expectedInstants.isEmpty)

// move clock forward to keep alive
let newTime = clock.now.advanced(by: keepAliveDuration)
clock.advance(to: newTime)

await keepAlive.nextKeepAlive { keepAliveConnection in
XCTAssertTrue(keepAliveConnection === lease1Connection)
return true
}

// keep alive 2
let deadline3 = await clock.nextTimerScheduled()
XCTAssertEqual(deadline3, clock.now.advanced(by: keepAliveDuration))
clock.advance(to: clock.now.advanced(by: keepAliveDuration))

let failingKeepAliveDidRun = ManagedAtomic(false)
// the following keep alive should not cause a crash
_ = try? await keepAlive.nextKeepAlive { keepAliveConnection in
defer {
XCTAssertFalse(failingKeepAliveDidRun
.compareExchange(expected: false, desired: true, ordering: .relaxed).original)
}
XCTAssertTrue(keepAliveConnection === lease1Connection)
keepAliveConnection.close()
throw CancellationError() // any error
} // will fail and it's expected
XCTAssertTrue(failingKeepAliveDidRun.load(ordering: .relaxed))

taskGroup.cancelAll()

for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

func testKeepAliveWorksRacesAgainstShutdown() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,35 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase {
XCTAssertEqual(afterPingIdleContext.use, .persisted)
XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1))
}

func testKeepAliveShouldNotIndicateCloseConnectionAfterClosed() {
var connections = TestPoolStateMachine.ConnectionGroup(
generator: self.idGenerator,
minimumConcurrentConnections: 0,
maximumConcurrentConnectionSoftLimit: 2,
maximumConcurrentConnectionHardLimit: 2,
keepAlive: true,
keepAliveReducesAvailableStreams: true
)

guard let firstRequest = connections.createNewDemandConnectionIfPossible() else { return XCTFail("Expected to have a request here") }

let newConnection = MockConnection(id: firstRequest.connectionID)
let (connectionIndex, establishedConnectionContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1)
XCTAssertEqual(establishedConnectionContext.info, .idle(availableStreams: 1, newIdle: true))
XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1))
_ = connections.parkConnection(at: connectionIndex, hasBecomeIdle: true)
let keepAliveTimer = TestPoolStateMachine.ConnectionTimer(timerID: 0, connectionID: firstRequest.connectionID, usecase: .keepAlive)
let keepAliveTimerCancellationToken = MockTimerCancellationToken(keepAliveTimer)
XCTAssertNil(connections.timerScheduled(keepAliveTimer, cancelContinuation: keepAliveTimerCancellationToken))
let keepAliveAction = connections.keepAliveIfIdle(newConnection.id)
XCTAssertEqual(keepAliveAction, .init(connection: newConnection, keepAliveTimerCancellationContinuation: keepAliveTimerCancellationToken))
XCTAssertEqual(connections.stats, .init(idle: 1, runningKeepAlive: 1, availableStreams: 0))

_ = connections.closeConnectionIfIdle(newConnection.id)
guard connections.keepAliveFailed(newConnection.id) == nil else {
return XCTFail("Expected keepAliveFailed not to cause close again")
}
XCTAssertEqual(connections.stats, .init(closing: 1))
}
}
111 changes: 111 additions & 0 deletions Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,115 @@ final class PoolStateMachineTests: XCTestCase {
XCTAssertEqual(releaseRequest1.connection, .none)
}

func testKeepAliveOnClosingConnection() {
var configuration = PoolConfiguration()
configuration.minimumConnectionCount = 0
configuration.maximumConnectionSoftLimit = 2
configuration.maximumConnectionHardLimit = 2
configuration.keepAliveDuration = .seconds(2)
configuration.idleTimeoutDuration = .seconds(4)


var stateMachine = TestPoolStateMachine(
configuration: configuration,
generator: .init(),
timerCancellationTokenType: MockTimerCancellationToken.self
)

// don't refill pool
let requests = stateMachine.refillConnections()
XCTAssertEqual(requests.count, 0)

// request connection while none exists
let request1 = MockRequest()
let leaseRequest1 = stateMachine.leaseConnection(request1)
XCTAssertEqual(leaseRequest1.connection, .makeConnection(.init(connectionID: 0), []))
XCTAssertEqual(leaseRequest1.request, .none)

// make connection 1
let connection1 = MockConnection(id: 0)
let createdAction1 = stateMachine.connectionEstablished(connection1, maxStreams: 1)
XCTAssertEqual(createdAction1.request, .leaseConnection(.init(element: request1), connection1))
XCTAssertEqual(createdAction1.connection, .none)
_ = stateMachine.releaseConnection(connection1, streams: 1)

// trigger keep alive
let keepAliveAction1 = stateMachine.connectionKeepAliveTimerTriggered(connection1.id)
XCTAssertEqual(keepAliveAction1.connection, .runKeepAlive(connection1, nil))

// fail keep alive and cause closed
let keepAliveFailed1 = stateMachine.connectionKeepAliveFailed(connection1.id)
XCTAssertEqual(keepAliveFailed1.connection, .closeConnection(connection1, []))
connection1.closeIfClosing()

// request connection while none exists anymore
let request2 = MockRequest()
let leaseRequest2 = stateMachine.leaseConnection(request2)
XCTAssertEqual(leaseRequest2.connection, .makeConnection(.init(connectionID: 1), []))
XCTAssertEqual(leaseRequest2.request, .none)

// make connection 2
let connection2 = MockConnection(id: 1)
let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1)
XCTAssertEqual(createdAction2.request, .leaseConnection(.init(element: request2), connection2))
XCTAssertEqual(createdAction2.connection, .none)
_ = stateMachine.releaseConnection(connection2, streams: 1)

// trigger keep alive while connection is still open
let keepAliveAction2 = stateMachine.connectionKeepAliveTimerTriggered(connection2.id)
XCTAssertEqual(keepAliveAction2.connection, .runKeepAlive(connection2, nil))

// close connection in the middle of keep alive
connection2.close()
connection2.closeIfClosing()

// fail keep alive and cause closed
let keepAliveFailed2 = stateMachine.connectionKeepAliveFailed(connection2.id)
XCTAssertEqual(keepAliveFailed2.connection, .closeConnection(connection2, []))
}

func testConnectionIsEstablishedAfterFailedKeepAliveIfNotEnoughConnectionsLeft() {
var configuration = PoolConfiguration()
configuration.minimumConnectionCount = 1
configuration.maximumConnectionSoftLimit = 2
configuration.maximumConnectionHardLimit = 2
configuration.keepAliveDuration = .seconds(2)
configuration.idleTimeoutDuration = .seconds(4)


var stateMachine = TestPoolStateMachine(
configuration: configuration,
generator: .init(),
timerCancellationTokenType: MockTimerCancellationToken.self
)

// refill pool
let requests = stateMachine.refillConnections()
XCTAssertEqual(requests.count, 1)

// one connection should exist
let request = MockRequest()
let leaseRequest = stateMachine.leaseConnection(request)
XCTAssertEqual(leaseRequest.connection, .none)
XCTAssertEqual(leaseRequest.request, .none)

// make connection 1
let connection = MockConnection(id: 0)
let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1)
XCTAssertEqual(createdAction.request, .leaseConnection(.init(element: request), connection))
XCTAssertEqual(createdAction.connection, .none)
_ = stateMachine.releaseConnection(connection, streams: 1)

// trigger keep alive
let keepAliveAction = stateMachine.connectionKeepAliveTimerTriggered(connection.id)
XCTAssertEqual(keepAliveAction.connection, .runKeepAlive(connection, nil))

// fail keep alive, cause closed and make new connection
let keepAliveFailed = stateMachine.connectionKeepAliveFailed(connection.id)
XCTAssertEqual(keepAliveFailed.connection, .closeConnection(connection, []))
let connectionClosed = stateMachine.connectionClosed(connection)
XCTAssertEqual(connectionClosed.connection, .makeConnection(.init(connectionID: 1), []))
connection.closeIfClosing()
}

}

0 comments on commit e60e495

Please sign in to comment.