From e3b24a5f37ee946f59fde671994d00930d147e5c Mon Sep 17 00:00:00 2001 From: Josh Kaplan Date: Thu, 7 Apr 2022 23:55:45 +1200 Subject: [PATCH] Various internal improvements to sequential response The main improvement is that sequential replies use an XPC reply to keep open a transaction which should prevent the server from being terminated. --- Sources/SecureXPC/Client/XPCClient.swift | 160 ++++++++++-------- .../Server/SequentialResultProvider.swift | 47 +++-- Sources/SecureXPC/Server/XPCServer.swift | 43 +++-- .../Sequential Result Tests.swift | 3 +- Tests/SecureXPCTests/SecureXPCTests.swift | 6 - 5 files changed, 151 insertions(+), 108 deletions(-) delete mode 100644 Tests/SecureXPCTests/SecureXPCTests.swift diff --git a/Sources/SecureXPC/Client/XPCClient.swift b/Sources/SecureXPC/Client/XPCClient.swift index 851e852..9ef339b 100644 --- a/Sources/SecureXPC/Client/XPCClient.swift +++ b/Sources/SecureXPC/Client/XPCClient.swift @@ -448,23 +448,6 @@ public class XPCClient { // MARK: Send (private internals) - /// Does the actual work of sending an XPC request which receives zero or more sequential responses. - private func sendRequest(_ request: Request, handler: @escaping XPCSequentialResponseHandler) { - // Get the connection or inform the handler of failure and return - let connection: xpc_connection_t - do { - connection = try getConnection() - } catch { - handler(.failure(XPCError.asXPCError(error: error))) - return - } - - let internalHandler = InternalXPCSequentialResponseHandlerImpl(request: request, handler: handler) - self.inProgressSequentialReplies.registerHandler(internalHandler, forRequest: request) - - xpc_connection_send_message(connection, request.dictionary) - } - /// Does the actual work of sending an XPC request which receives a response. private func sendRequest(_ request: Request, withResponse handler: @escaping XPCResponseHandler) { @@ -533,6 +516,43 @@ public class XPCClient { } } + /// Does the actual work of sending an XPC request which receives zero or more sequential responses. + private func sendRequest(_ request: Request, handler: @escaping XPCSequentialResponseHandler) { + // Get the connection or inform the handler of failure and return + let connection: xpc_connection_t + do { + connection = try getConnection() + } catch { + handler(.failure(XPCError.asXPCError(error: error))) + return + } + + let internalHandler = InternalXPCSequentialResponseHandlerImpl(request: request, handler: handler) + self.inProgressSequentialReplies.registerHandler(internalHandler, forRequest: request) + + // Sending with reply means the server ought to be kept alive until the reply is sent back + // From https://developer.apple.com/documentation/xpc/1505586-xpc_transaction_begin: + // A service with no outstanding transactions may automatically exit due to inactivity as determined by the + // system... If a reply message is created, the transaction will end when the reply message is sent or + // released. + xpc_connection_send_message_with_reply(connection, request.dictionary, nil) { reply in + // From xpc_connection_send_message documentation: + // If this API is used to send a message that is in reply to another message, there is no guarantee of + // ordering between the invocations of the connection's event handler and the reply handler for that + // message, even if they are targeted to the same queue. + // + // The net effect of this is we can't do much with the reply such as terminating the sequence because this + // reply can arrive before some of the out-of-band sends from the server to client do. (This was attempted + // and it caused unit tests to fail non-deterministically.) + // + // But if this is an internal XPC error (for example because the server shut down), we can use this to + // update the connection's state. + if xpc_get_type(reply) == XPC_TYPE_ERROR { + self.handleError(event: reply) + } + } + } + /// Populates the continuation while unwrapping any underlying errors thrown by a server's handler. @available(macOS 10.15.0, *) private func populateAsyncThrowingStreamContinuation( @@ -567,28 +587,15 @@ public class XPCClient { return newConnection } + // MARK: Incoming event handling + private func handleEvent(event: xpc_object_t) { if xpc_get_type(event) == XPC_TYPE_DICTIONARY { - do { - try self.handleMessage(event) - } catch { - // In theory these could be reported to some sort of error handler set on the client, but it's an - // intentional choice to not introduce that additional conceptual complexity for API users since - // in all other cases errors are associated with the specific send/sendMessage call. - } + self.inProgressSequentialReplies.handleMessage(event) } else if xpc_get_type(event) == XPC_TYPE_ERROR { self.handleError(event: event) } } - - private func handleMessage(_ message: xpc_object_t) throws { - let requestID = try Response.decodeRequestID(dictionary: message) - guard let route = self.inProgressSequentialReplies.routeForRequestID(requestID) else { - return - } - let response = try Response(dictionary: message, route: route) - self.inProgressSequentialReplies.handleResponse(response) - } private func handleError(event: xpc_object_t) { if xpc_equal(event, XPC_ERROR_CONNECTION_INVALID) { @@ -637,12 +644,16 @@ public class XPCClient { } } +// MARK: Sequential reply helpers + /// This allows for type erasure fileprivate protocol InternalXPCSequentialResponseHandler { + var route: XPCRoute { get } func handleResponse(_ response: Response) } fileprivate class InternalXPCSequentialResponseHandlerImpl: InternalXPCSequentialResponseHandler { + let route: XPCRoute private var failedToDecode = false private let handler: XPCClient.XPCSequentialResponseHandler @@ -651,35 +662,39 @@ fileprivate class InternalXPCSequentialResponseHandlerImpl: Intern private let serialQueue: DispatchQueue fileprivate init(request: Request, handler: @escaping XPCClient.XPCSequentialResponseHandler) { + self.route = request.route self.handler = handler self.serialQueue = DispatchQueue(label: "response-handler-\(request.requestID)") } fileprivate func handleResponse(_ response: Response) { self.serialQueue.async { - if !self.failedToDecode { - do { - if response.containsPayload { - self.handler(.success(try response.decodePayload(asType: S.self))) - } else if response.containsError { - self.handler(.failure(try response.decodeError())) - } else { - self.handler(.finished) - } - } catch { - /// If we failed to decode then we need to terminate the sequence; however, the server won't know this happened. (Even if we were - /// to inform the server it'd only be an optimization because the server could've already sent more replies in the interim.) We need to now - /// ignore any future replies to prevent adding to the now terminated sequence. This requirements comes from how iterating over an - /// async sequence works where if a call to `next()` throws then the iteration terminates an this is reflected in the documentation for - /// `AsyncThrowingStream`: - /// In contrast to AsyncStream, this type can throw an error from the awaited next(), which terminates the stream with the thrown - /// error. - /// - /// While in theory we don't have to enforce this for the closure-based implementation, in principle and practice we want the closure and - /// async implementations to be as consistent as possible. - self.handler(.failure(XPCError.asXPCError(error: error))) - self.failedToDecode = true + if self.failedToDecode { + return + } + + do { + if response.containsPayload { + self.handler(.success(try response.decodePayload(asType: S.self))) + } else if response.containsError { + self.handler(.failure(try response.decodeError())) + } else { + self.handler(.finished) } + } catch { + // If we failed to decode then we need to terminate the sequence; however, the server won't know this + // happened. (Even if we were to inform the server it'd only be an optimization because the server + // could've already sent more replies in the interim.) We need to now ignore any future replies to + // prevent adding to the now terminated sequence. This requirement comes from how iterating over an + // async sequence works where if a call to `next()` throws then the iteration terminates. This is + // reflected in the documentation for `AsyncThrowingStream`: + // In contrast to AsyncStream, this type can throw an error from the awaited next(), which + // terminates the stream with the thrown error. + // + // While in theory we don't have to enforce this for the closure-based implementation, in principle and + // practice we want the closure and async implementations to be as consistent as possible. + self.handler(.failure(XPCError.asXPCError(error: error))) + self.failedToDecode = true } } } @@ -690,36 +705,39 @@ fileprivate class InternalXPCSequentialResponseHandlerImpl: Intern fileprivate class InProgressSequentialReplies { /// Mapping of requestIDs to handlers. private var handlers = [UUID : InternalXPCSequentialResponseHandler]() - /// Mapping of requestIDs to routes. - private var routes = [UUID : XPCRoute]() - /// This queue is used to serialize access to the above dictionaries. + /// This queue is used to serialize access to the above dictionary. private let serialQueue = DispatchQueue(label: String(describing: InProgressSequentialReplies.self)) func registerHandler(_ handler: InternalXPCSequentialResponseHandler, forRequest request: Request) { - serialQueue.sync { - handlers[request.requestID] = handler - routes[request.requestID] = request.route - } - } - - func routeForRequestID(_ requestID: UUID) -> XPCRoute? { - serialQueue.sync { - routes[requestID] + serialQueue.async { + self.handlers[request.requestID] = handler } } - func handleResponse(_ response: Response) { + func handleMessage(_ message: xpc_object_t) { serialQueue.async { + let response: Response + do { + let requestID = try Response.decodeRequestID(dictionary: message) + guard let route = self.handlers[requestID]?.route else { + return + } + response = try Response(dictionary: message, route: route) + } catch { + // In theory these could be reported to some sort of error handler set on the client, but it's an + // intentional choice to not introduce that additional conceptual complexity for API users because in + // all other cases errors are associated with the specific send/sendMessage call. + return + } + + // Retrieve the handler, removing it if the sequence has finished or errored out let handler: InternalXPCSequentialResponseHandler? - /// Remove the handler if the sequence has finished or errored out if response.containsPayload { handler = self.handlers[response.requestID] } else if response.containsError { handler = self.handlers.removeValue(forKey: response.requestID) - self.routes.removeValue(forKey: response.requestID) } else { // Finished handler = self.handlers.removeValue(forKey: response.requestID) - self.routes.removeValue(forKey: response.requestID) } guard let handler = handler else { fatalError("Sequential result was received for an unregistered requestID: \(response.requestID)") diff --git a/Sources/SecureXPC/Server/SequentialResultProvider.swift b/Sources/SecureXPC/Server/SequentialResultProvider.swift index 0eeba19..f06f7b6 100644 --- a/Sources/SecureXPC/Server/SequentialResultProvider.swift +++ b/Sources/SecureXPC/Server/SequentialResultProvider.swift @@ -32,7 +32,7 @@ import Foundation /// - ``failure(error:)`` /// - ``isFinished`` public class SequentialResultProvider { - private let requestID: UUID + private let request: Request private weak var server: XPCServer? private weak var connection: xpc_connection_t? private let serialQueue: DispatchQueue @@ -46,7 +46,7 @@ public class SequentialResultProvider { public private(set) var isFinished: Bool init(request: Request, server: XPCServer, connection: xpc_connection_t) { - self.requestID = request.requestID + self.request = request self.server = server self.connection = connection self.isFinished = false @@ -99,16 +99,14 @@ public class SequentialResultProvider { self.isFinished = isFinished - if let connection = self.connection { + guard let connection = self.connection else { + self.sendToServerErrorHandler(XPCError.clientNotConnected) + return + } + + do { var response = xpc_dictionary_create(nil, nil, 0) - do { - try Response.encodeRequestID(self.requestID, intoReply: &response) - } catch { - // If we're not able to encode the requestID, there's no point sending back a response as the client - // wouldn't be able to make use of it - self.sendToServerErrorHandler(error) - return - } + try Response.encodeRequestID(self.request.requestID, intoReply: &response) do { try encodingWork(&response) @@ -118,24 +116,39 @@ public class SequentialResultProvider { do { let errorResponse = xpc_dictionary_create(nil, nil, 0) - try Response.encodeRequestID(self.requestID, intoReply: &response) + try Response.encodeRequestID(self.request.requestID, intoReply: &response) try Response.encodeError(XPCError.asXPCError(error: error), intoReply: &response) xpc_connection_send_message(connection, errorResponse) self.isFinished = true } catch { - // Unable to send back the error, so just bail - return + // Unable to send back the error, so there's nothing more to be done } } - } else { - self.sendToServerErrorHandler(XPCError.clientNotConnected) + } catch { + // If we're not able to encode the requestID, there's no point sending back a response as the client + // wouldn't be able to make use of it + self.sendToServerErrorHandler(error) + } + + if isFinished { + self.endTransaction() } } } - func sendToServerErrorHandler(_ error: Error) { + private func sendToServerErrorHandler(_ error: Error) { if let server = server { server.errorHandler.handle(XPCError.asXPCError(error: error)) } } + + /// Sends an empty reply to the client such that it ends the transaction + private func endTransaction() { + if let connection = connection { + guard let reply = xpc_dictionary_create_reply(self.request.dictionary) else { + fatalError("Unable to create reply for request \(request.requestID)") + } + xpc_connection_send_message(connection, reply) + } + } } diff --git a/Sources/SecureXPC/Server/XPCServer.swift b/Sources/SecureXPC/Server/XPCServer.swift index c1cb218..cb1b3a5 100644 --- a/Sources/SecureXPC/Server/XPCServer.swift +++ b/Sources/SecureXPC/Server/XPCServer.swift @@ -284,7 +284,7 @@ public class XPCServer { _ route: XPCRouteWithoutMessageWithSequentialReply, handler: @escaping (SequentialResultProvider) -> Void ) { - let constrainedHandler = ConstrainedXPCHandlerWithoutMessageWithReplySequenceSync(handler: handler) + let constrainedHandler = ConstrainedXPCHandlerWithoutMessageWithSequentialReplySync(handler: handler) self.registerRoute(route.route, handler: constrainedHandler) } @@ -300,7 +300,7 @@ public class XPCServer { _ route: XPCRouteWithoutMessageWithSequentialReply, handler: @escaping (SequentialResultProvider) async -> Void ) { - let constrainedHandler = ConstrainedXPCHandlerWithoutMessageWithReplySequenceAsync(handler: handler) + let constrainedHandler = ConstrainedXPCHandlerWithoutMessageWithSequentialReplyAsync(handler: handler) self.registerRoute(route.route, handler: constrainedHandler) } @@ -315,7 +315,7 @@ public class XPCServer { _ route: XPCRouteWithMessageWithSequentialReply, handler: @escaping (M, SequentialResultProvider) -> Void ) { - let constrainedHandler = ConstrainedXPCHandlerWithMessageWithReplySequenceSync(handler: handler) + let constrainedHandler = ConstrainedXPCHandlerWithMessageWithSequentialReplySync(handler: handler) self.registerRoute(route.route, handler: constrainedHandler) } @@ -331,7 +331,7 @@ public class XPCServer { _ route: XPCRouteWithMessageWithSequentialReply, handler: @escaping (M, SequentialResultProvider) async -> Void ) { - let constrainedHandler = ConstrainedXPCHandlerWithMessageWithReplySequenceAsync(handler: handler) + let constrainedHandler = ConstrainedXPCHandlerWithMessageWithSequentialReplyAsync(handler: handler) self.registerRoute(route.route, handler: constrainedHandler) } @@ -399,22 +399,24 @@ public class XPCServer { if let handler = handler as? XPCHandlerSync { XPCRequestContext.setForCurrentThread(connection: connection, message: message) { - var reply = xpc_dictionary_create_reply(message) + var reply = handler.shouldCreateReply ? xpc_dictionary_create_reply(message) : nil do { try handler.handle(request: request, server: self, connection: connection, reply: &reply) try maybeSendReply(&reply, request: request, connection: connection) } catch { + var reply = handler.shouldCreateReply ? reply : xpc_dictionary_create_reply(message) self.handleError(error, request: request, connection: connection, reply: &reply) } } } else if #available(macOS 10.15.0, *), let handler = handler as? XPCHandlerAsync { XPCRequestContext.setForTask(connection: connection, message: message) { Task { - var reply = xpc_dictionary_create_reply(message) + var reply = handler.shouldCreateReply ? xpc_dictionary_create_reply(message) : nil do { try await handler.handle(request: request, server: self, connection: connection, reply: &reply) try maybeSendReply(&reply, request: request, connection: connection) } catch { + var reply = handler.shouldCreateReply ? reply : xpc_dictionary_create_reply(message) self.handleError(error, request: request, connection: connection, reply: &reply) } } @@ -659,7 +661,13 @@ extension XPCServer { // These wrappers perform type erasure via their implemented protocols while internally maintaining type constraints // This makes it possible to create heterogenous collections of them -fileprivate protocol XPCHandler {} +fileprivate protocol XPCHandler { + /// Whether as part of handling a request, an attempt should be made to create a reply. + /// + /// This doesn't necessarily mean the route actually has a reply type. This exists because for sequential reply types a reply should *not* be created as part + /// of request handling; it may be created later if the sequence completes. XPC only allows a reply object to be created exactly once per request. + var shouldCreateReply: Bool { get } +} fileprivate extension XPCHandler { @@ -723,6 +731,7 @@ fileprivate protocol XPCHandlerSync: XPCHandler { } fileprivate struct ConstrainedXPCHandlerWithoutMessageWithoutReplySync: XPCHandlerSync { + var shouldCreateReply = true let handler: () throws -> Void func handle(request: Request, server: XPCServer, connection: xpc_connection_t, reply: inout xpc_object_t?) throws { @@ -732,6 +741,7 @@ fileprivate struct ConstrainedXPCHandlerWithoutMessageWithoutReplySync: XPCHandl } fileprivate struct ConstrainedXPCHandlerWithMessageWithoutReplySync: XPCHandlerSync { + var shouldCreateReply = true let handler: (M) throws -> Void func handle(request: Request, server: XPCServer, connection: xpc_connection_t, reply: inout xpc_object_t?) throws { @@ -742,6 +752,7 @@ fileprivate struct ConstrainedXPCHandlerWithMessageWithoutReplySync: XPCHandlerSync { + var shouldCreateReply = true let handler: () throws -> R func handle(request: Request, server: XPCServer, connection: xpc_connection_t, reply: inout xpc_object_t?) throws { @@ -752,6 +763,7 @@ fileprivate struct ConstrainedXPCHandlerWithoutMessageWithReplySync: XPCHandlerSync { + var shouldCreateReply = true let handler: (M) throws -> R func handle(request: Request, server: XPCServer, connection: xpc_connection_t, reply: inout xpc_object_t?) throws { @@ -762,7 +774,8 @@ fileprivate struct ConstrainedXPCHandlerWithMessageWithReplySync: XPCHandlerSync { +fileprivate struct ConstrainedXPCHandlerWithoutMessageWithSequentialReplySync: XPCHandlerSync { + var shouldCreateReply = false let handler: (SequentialResultProvider) -> Void func handle(request: Request, server: XPCServer, connection: xpc_connection_t, reply: inout xpc_object_t?) throws { @@ -772,7 +785,8 @@ fileprivate struct ConstrainedXPCHandlerWithoutMessageWithReplySequenceSync: XPCHandlerSync { +fileprivate struct ConstrainedXPCHandlerWithMessageWithSequentialReplySync: XPCHandlerSync { + var shouldCreateReply = false let handler: (M, SequentialResultProvider) -> Void func handle(request: Request, server: XPCServer, connection: xpc_connection_t, reply: inout xpc_object_t?) throws { @@ -797,6 +811,7 @@ fileprivate protocol XPCHandlerAsync: XPCHandler { @available(macOS 10.15.0, *) fileprivate struct ConstrainedXPCHandlerWithoutMessageWithoutReplyAsync: XPCHandlerAsync { + var shouldCreateReply = true let handler: () async throws -> Void func handle( @@ -812,6 +827,7 @@ fileprivate struct ConstrainedXPCHandlerWithoutMessageWithoutReplyAsync: XPCHand @available(macOS 10.15.0, *) fileprivate struct ConstrainedXPCHandlerWithMessageWithoutReplyAsync: XPCHandlerAsync { + var shouldCreateReply = true let handler: (M) async throws -> Void func handle( @@ -828,6 +844,7 @@ fileprivate struct ConstrainedXPCHandlerWithMessageWithoutReplyAsync: XPCHandlerAsync { + var shouldCreateReply = true let handler: () async throws -> R func handle( @@ -844,6 +861,7 @@ fileprivate struct ConstrainedXPCHandlerWithoutMessageWithReplyAsync: XPCHandlerAsync { + var shouldCreateReply = true let handler: (M) async throws -> R func handle( @@ -860,7 +878,8 @@ fileprivate struct ConstrainedXPCHandlerWithMessageWithReplyAsync: XPCHandlerAsync { +fileprivate struct ConstrainedXPCHandlerWithoutMessageWithSequentialReplyAsync: XPCHandlerAsync { + var shouldCreateReply = false let handler: (SequentialResultProvider) async -> Void func handle( @@ -876,7 +895,8 @@ fileprivate struct ConstrainedXPCHandlerWithoutMessageWithReplySequenceAsync: XPCHandlerAsync { +fileprivate struct ConstrainedXPCHandlerWithMessageWithSequentialReplyAsync: XPCHandlerAsync { + var shouldCreateReply = false let handler: (M, SequentialResultProvider) async -> Void func handle( @@ -892,7 +912,6 @@ fileprivate struct ConstrainedXPCHandlerWithMessageWithReplySequenceAsync