Skip to content

Commit

Permalink
Merge pull request #85 from trilemma-dev/sequential-response-improvem…
Browse files Browse the repository at this point in the history
…ents

Various internal improvements to sequential response
  • Loading branch information
jakaplan authored Apr 7, 2022
2 parents e359311 + e3b24a5 commit b587133
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 108 deletions.
160 changes: 89 additions & 71 deletions Sources/SecureXPC/Client/XPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -448,23 +448,6 @@ public class XPCClient {

// MARK: Send (private internals)

/// Does the actual work of sending an XPC request which receives zero or more sequential responses.
private func sendRequest<S: Decodable>(_ request: Request, handler: @escaping XPCSequentialResponseHandler<S>) {
// Get the connection or inform the handler of failure and return
let connection: xpc_connection_t
do {
connection = try getConnection()
} catch {
handler(.failure(XPCError.asXPCError(error: error)))
return
}

let internalHandler = InternalXPCSequentialResponseHandlerImpl(request: request, handler: handler)
self.inProgressSequentialReplies.registerHandler(internalHandler, forRequest: request)

xpc_connection_send_message(connection, request.dictionary)
}

/// Does the actual work of sending an XPC request which receives a response.
private func sendRequest<R: Decodable>(_ request: Request,
withResponse handler: @escaping XPCResponseHandler<R>) {
Expand Down Expand Up @@ -533,6 +516,43 @@ public class XPCClient {
}
}

/// Does the actual work of sending an XPC request which receives zero or more sequential responses.
private func sendRequest<S: Decodable>(_ request: Request, handler: @escaping XPCSequentialResponseHandler<S>) {
// Get the connection or inform the handler of failure and return
let connection: xpc_connection_t
do {
connection = try getConnection()
} catch {
handler(.failure(XPCError.asXPCError(error: error)))
return
}

let internalHandler = InternalXPCSequentialResponseHandlerImpl(request: request, handler: handler)
self.inProgressSequentialReplies.registerHandler(internalHandler, forRequest: request)

// Sending with reply means the server ought to be kept alive until the reply is sent back
// From https://developer.apple.com/documentation/xpc/1505586-xpc_transaction_begin:
// A service with no outstanding transactions may automatically exit due to inactivity as determined by the
// system... If a reply message is created, the transaction will end when the reply message is sent or
// released.
xpc_connection_send_message_with_reply(connection, request.dictionary, nil) { reply in
// From xpc_connection_send_message documentation:
// If this API is used to send a message that is in reply to another message, there is no guarantee of
// ordering between the invocations of the connection's event handler and the reply handler for that
// message, even if they are targeted to the same queue.
//
// The net effect of this is we can't do much with the reply such as terminating the sequence because this
// reply can arrive before some of the out-of-band sends from the server to client do. (This was attempted
// and it caused unit tests to fail non-deterministically.)
//
// But if this is an internal XPC error (for example because the server shut down), we can use this to
// update the connection's state.
if xpc_get_type(reply) == XPC_TYPE_ERROR {
self.handleError(event: reply)
}
}
}

/// Populates the continuation while unwrapping any underlying errors thrown by a server's handler.
@available(macOS 10.15.0, *)
private func populateAsyncThrowingStreamContinuation<S: Decodable>(
Expand Down Expand Up @@ -567,28 +587,15 @@ public class XPCClient {
return newConnection
}

// MARK: Incoming event handling

private func handleEvent(event: xpc_object_t) {
if xpc_get_type(event) == XPC_TYPE_DICTIONARY {
do {
try self.handleMessage(event)
} catch {
// In theory these could be reported to some sort of error handler set on the client, but it's an
// intentional choice to not introduce that additional conceptual complexity for API users since
// in all other cases errors are associated with the specific send/sendMessage call.
}
self.inProgressSequentialReplies.handleMessage(event)
} else if xpc_get_type(event) == XPC_TYPE_ERROR {
self.handleError(event: event)
}
}

private func handleMessage(_ message: xpc_object_t) throws {
let requestID = try Response.decodeRequestID(dictionary: message)
guard let route = self.inProgressSequentialReplies.routeForRequestID(requestID) else {
return
}
let response = try Response(dictionary: message, route: route)
self.inProgressSequentialReplies.handleResponse(response)
}

private func handleError(event: xpc_object_t) {
if xpc_equal(event, XPC_ERROR_CONNECTION_INVALID) {
Expand Down Expand Up @@ -637,12 +644,16 @@ public class XPCClient {
}
}

// MARK: Sequential reply helpers

/// This allows for type erasure
fileprivate protocol InternalXPCSequentialResponseHandler {
var route: XPCRoute { get }
func handleResponse(_ response: Response)
}

fileprivate class InternalXPCSequentialResponseHandlerImpl<S: Decodable>: InternalXPCSequentialResponseHandler {
let route: XPCRoute
private var failedToDecode = false
private let handler: XPCClient.XPCSequentialResponseHandler<S>

Expand All @@ -651,35 +662,39 @@ fileprivate class InternalXPCSequentialResponseHandlerImpl<S: Decodable>: Intern
private let serialQueue: DispatchQueue

fileprivate init(request: Request, handler: @escaping XPCClient.XPCSequentialResponseHandler<S>) {
self.route = request.route
self.handler = handler
self.serialQueue = DispatchQueue(label: "response-handler-\(request.requestID)")
}

fileprivate func handleResponse(_ response: Response) {
self.serialQueue.async {
if !self.failedToDecode {
do {
if response.containsPayload {
self.handler(.success(try response.decodePayload(asType: S.self)))
} else if response.containsError {
self.handler(.failure(try response.decodeError()))
} else {
self.handler(.finished)
}
} catch {
/// If we failed to decode then we need to terminate the sequence; however, the server won't know this happened. (Even if we were
/// to inform the server it'd only be an optimization because the server could've already sent more replies in the interim.) We need to now
/// ignore any future replies to prevent adding to the now terminated sequence. This requirements comes from how iterating over an
/// async sequence works where if a call to `next()` throws then the iteration terminates an this is reflected in the documentation for
/// `AsyncThrowingStream`:
/// In contrast to AsyncStream, this type can throw an error from the awaited next(), which terminates the stream with the thrown
/// error.
///
/// While in theory we don't have to enforce this for the closure-based implementation, in principle and practice we want the closure and
/// async implementations to be as consistent as possible.
self.handler(.failure(XPCError.asXPCError(error: error)))
self.failedToDecode = true
if self.failedToDecode {
return
}

do {
if response.containsPayload {
self.handler(.success(try response.decodePayload(asType: S.self)))
} else if response.containsError {
self.handler(.failure(try response.decodeError()))
} else {
self.handler(.finished)
}
} catch {
// If we failed to decode then we need to terminate the sequence; however, the server won't know this
// happened. (Even if we were to inform the server it'd only be an optimization because the server
// could've already sent more replies in the interim.) We need to now ignore any future replies to
// prevent adding to the now terminated sequence. This requirement comes from how iterating over an
// async sequence works where if a call to `next()` throws then the iteration terminates. This is
// reflected in the documentation for `AsyncThrowingStream`:
// In contrast to AsyncStream, this type can throw an error from the awaited next(), which
// terminates the stream with the thrown error.
//
// While in theory we don't have to enforce this for the closure-based implementation, in principle and
// practice we want the closure and async implementations to be as consistent as possible.
self.handler(.failure(XPCError.asXPCError(error: error)))
self.failedToDecode = true
}
}
}
Expand All @@ -690,36 +705,39 @@ fileprivate class InternalXPCSequentialResponseHandlerImpl<S: Decodable>: Intern
fileprivate class InProgressSequentialReplies {
/// Mapping of requestIDs to handlers.
private var handlers = [UUID : InternalXPCSequentialResponseHandler]()
/// Mapping of requestIDs to routes.
private var routes = [UUID : XPCRoute]()
/// This queue is used to serialize access to the above dictionaries.
/// This queue is used to serialize access to the above dictionary.
private let serialQueue = DispatchQueue(label: String(describing: InProgressSequentialReplies.self))

func registerHandler(_ handler: InternalXPCSequentialResponseHandler, forRequest request: Request) {
serialQueue.sync {
handlers[request.requestID] = handler
routes[request.requestID] = request.route
}
}

func routeForRequestID(_ requestID: UUID) -> XPCRoute? {
serialQueue.sync {
routes[requestID]
serialQueue.async {
self.handlers[request.requestID] = handler
}
}

func handleResponse(_ response: Response) {
func handleMessage(_ message: xpc_object_t) {
serialQueue.async {
let response: Response
do {
let requestID = try Response.decodeRequestID(dictionary: message)
guard let route = self.handlers[requestID]?.route else {
return
}
response = try Response(dictionary: message, route: route)
} catch {
// In theory these could be reported to some sort of error handler set on the client, but it's an
// intentional choice to not introduce that additional conceptual complexity for API users because in
// all other cases errors are associated with the specific send/sendMessage call.
return
}

// Retrieve the handler, removing it if the sequence has finished or errored out
let handler: InternalXPCSequentialResponseHandler?
/// Remove the handler if the sequence has finished or errored out
if response.containsPayload {
handler = self.handlers[response.requestID]
} else if response.containsError {
handler = self.handlers.removeValue(forKey: response.requestID)
self.routes.removeValue(forKey: response.requestID)
} else { // Finished
handler = self.handlers.removeValue(forKey: response.requestID)
self.routes.removeValue(forKey: response.requestID)
}
guard let handler = handler else {
fatalError("Sequential result was received for an unregistered requestID: \(response.requestID)")
Expand Down
47 changes: 30 additions & 17 deletions Sources/SecureXPC/Server/SequentialResultProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import Foundation
/// - ``failure(error:)``
/// - ``isFinished``
public class SequentialResultProvider<S: Encodable> {
private let requestID: UUID
private let request: Request
private weak var server: XPCServer?
private weak var connection: xpc_connection_t?
private let serialQueue: DispatchQueue
Expand All @@ -46,7 +46,7 @@ public class SequentialResultProvider<S: Encodable> {
public private(set) var isFinished: Bool

init(request: Request, server: XPCServer, connection: xpc_connection_t) {
self.requestID = request.requestID
self.request = request
self.server = server
self.connection = connection
self.isFinished = false
Expand Down Expand Up @@ -99,16 +99,14 @@ public class SequentialResultProvider<S: Encodable> {

self.isFinished = isFinished

if let connection = self.connection {
guard let connection = self.connection else {
self.sendToServerErrorHandler(XPCError.clientNotConnected)
return
}

do {
var response = xpc_dictionary_create(nil, nil, 0)
do {
try Response.encodeRequestID(self.requestID, intoReply: &response)
} catch {
// If we're not able to encode the requestID, there's no point sending back a response as the client
// wouldn't be able to make use of it
self.sendToServerErrorHandler(error)
return
}
try Response.encodeRequestID(self.request.requestID, intoReply: &response)

do {
try encodingWork(&response)
Expand All @@ -118,24 +116,39 @@ public class SequentialResultProvider<S: Encodable> {

do {
let errorResponse = xpc_dictionary_create(nil, nil, 0)
try Response.encodeRequestID(self.requestID, intoReply: &response)
try Response.encodeRequestID(self.request.requestID, intoReply: &response)
try Response.encodeError(XPCError.asXPCError(error: error), intoReply: &response)
xpc_connection_send_message(connection, errorResponse)
self.isFinished = true
} catch {
// Unable to send back the error, so just bail
return
// Unable to send back the error, so there's nothing more to be done
}
}
} else {
self.sendToServerErrorHandler(XPCError.clientNotConnected)
} catch {
// If we're not able to encode the requestID, there's no point sending back a response as the client
// wouldn't be able to make use of it
self.sendToServerErrorHandler(error)
}

if isFinished {
self.endTransaction()
}
}
}

func sendToServerErrorHandler(_ error: Error) {
private func sendToServerErrorHandler(_ error: Error) {
if let server = server {
server.errorHandler.handle(XPCError.asXPCError(error: error))
}
}

/// Sends an empty reply to the client such that it ends the transaction
private func endTransaction() {
if let connection = connection {
guard let reply = xpc_dictionary_create_reply(self.request.dictionary) else {
fatalError("Unable to create reply for request \(request.requestID)")
}
xpc_connection_send_message(connection, reply)
}
}
}
Loading

0 comments on commit b587133

Please sign in to comment.