Skip to content

Commit

Permalink
Improve transaction handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett committed Feb 11, 2025
1 parent 712740b commit b07eeb5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 17 deletions.
50 changes: 50 additions & 0 deletions Sources/PostgresNIO/Connection/PostgresConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,56 @@ extension PostgresConnection {
throw error // rethrow with more metadata
}
}

/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
///
/// The function starts a transaction by running a `BEGIN` query on the connection against the database. It then
/// lends the connection to the user provided closure. The user can then modify the database as they wish. If the user
/// provided closure returns successfully, the function will attempt to commit the changes by running a `COMMIT`
/// query against the database. If the user provided closure throws an error, the function will attempt to rollback the
/// changes made within the closure.
///
/// - Parameters:
/// - logger: The `Logger` to log into for the transaction.
/// - file: The file, the transaction was started in. Used for better error reporting.
/// - line: The line, the transaction was started in. Used for better error reporting.
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
/// The connection must stay in the transaction mode. Otherwise this method will throw!
/// - Returns: The closure's return value.
public func withTransaction<Result>(
logger: Logger,
file: String = #file,
line: Int = #line,
_ process: (PostgresConnection) async throws -> Result
) async throws -> Result {
do {
try await self.query("BEGIN;", logger: logger)
} catch {
throw PostgresTransactionError(file: file, line: line, beginError: error)
}

var closureHasFinished: Bool = false
do {
let value = try await process(self)
closureHasFinished = true
try await self.query("COMMIT;", logger: logger)
return value
} catch {
var transactionError = PostgresTransactionError(file: file, line: line)
if !closureHasFinished {
transactionError.closureError = error
do {
try await self.query("ROLLBACK;", logger: logger)
} catch {
transactionError.rollbackError = error
}
} else {
transactionError.commitError = error
}

throw transactionError
}
}
}

// MARK: EventLoopFuture interface
Expand Down
21 changes: 21 additions & 0 deletions Sources/PostgresNIO/New/PostgresTransactionError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/// A wrapper around the errors that can occur during a transaction.
public struct PostgresTransactionError: Error {

/// The file in which the transaction was started
public var file: String
/// The line in which the transaction was started
public var line: Int

/// The error thrown when running the `BEGIN` query
public var beginError: Error?
/// The error thrown in the transaction closure
public var closureError: Error?

/// The error thrown while rolling the transaction back. If the ``closureError`` is set,
/// but the ``rollbackError`` is empty, the rollback was successful. If the ``rollbackError``
/// is set, the rollback failed.
public var rollbackError: Error?

/// The error thrown while commiting the transaction.
public var commitError: Error?
}
38 changes: 21 additions & 17 deletions Sources/PostgresNIO/Pool/PostgresClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -308,25 +308,29 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
return try await closure(connection)
}

/// Lease a connection for the provided `closure`'s lifetime.
/// A transation starts with call to withConnection
/// A transaction should end with a call to COMMIT or ROLLBACK
/// COMMIT is called upon successful completion and ROLLBACK is called should any steps fail
/// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime.
///
/// - Parameter closure: A closure that uses the passed `PostgresConnection`. The closure **must not** capture
/// the provided `PostgresConnection`.
/// The function leases a connection from the underlying connection pool and starts a transaction by running a `BEGIN`
/// query on the leased connection against the database. It then lends the connection to the user provided closure.
/// The user can then modify the database as they wish. If the user provided closure returns successfully, the function
/// will attempt to commit the changes by running a `COMMIT` query against the database. If the user provided closure
/// throws an error, the function will attempt to rollback the changes made within the closure.
///
/// - Parameters:
/// - logger: The `Logger` to log into for the transaction.
/// - file: The file, the transaction was started in. Used for better error reporting.
/// - line: The line, the transaction was started in. Used for better error reporting.
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
/// The connection must stay in the transaction mode. Otherwise this method will throw!
/// - Returns: The closure's return value.
public func withTransaction<Result>(_ process: (PostgresConnection) async throws -> Result) async throws -> Result {
try await withConnection { connection in
try await connection.query("BEGIN;", logger: self.backgroundLogger)
do {
let value = try await process(connection)
try await connection.query("COMMIT;", logger: self.backgroundLogger)
return value
} catch {
try await connection.query("ROLLBACK;", logger: self.backgroundLogger)
throw error
}
public func withTransaction<Result>(
logger: Logger,
file: String = #file,
line: Int = #line,
_ closure: (PostgresConnection) async throws -> Result
) async throws -> Result {
try await self.withConnection { connection in
try await connection.withTransaction(logger: logger, file: file, line: line, closure)
}
}

Expand Down

0 comments on commit b07eeb5

Please sign in to comment.