diff --git a/spec/TCP.Spec.savi b/spec/TCP.Spec.savi index 6a46f96..75fd67c 100644 --- a/spec/TCP.Spec.savi +++ b/spec/TCP.Spec.savi @@ -77,14 +77,6 @@ TCP.auth(@env.root).connect.to("localhost", port) ) - // TODO: Can we make this trigger io_react with IO.Action.OpenFailed - // automatically via the same mechanism we will use for queuing later - // pending reads, instead of checking for this error case here? - if (@io.connect_error != OSError.None) ( - @env.err.print("[EchoClient] Failed to connect:") - @env.err.print(@io.connect_error.name) - ) - :fun ref io_react(action IO.Action) case action == ( | IO.Action.Opened | @@ -101,8 +93,7 @@ try @io.flush! | IO.Action.OpenFailed | - @env.err.print("[EchoClient] Failed to connect:") - @env.err.print(@io.connect_error.name) + @env.err.print("[EchoClient] Failed to connect.") | IO.Action.Read | if (@io.read_stream.bytes_ahead_of_marker >= b"Hello, World!".size) ( diff --git a/src/TCP.Engine.savi b/src/TCP.Engine.savi index 292e7e7..0a60cd4 100644 --- a/src/TCP.Engine.savi +++ b/src/TCP.Engine.savi @@ -2,31 +2,46 @@ :is IO.Engine(IO.Action) :var io IO.CoreEngine :var _listener (IO.Actor(IO.Action) | None): None - :var connect_error OSError: OSError.None + :var _pending_connect_count I32: 0 :let read_stream: ByteStream.Reader.new :let write_stream ByteStream.Writer - :new (actor IO.Actor(IO.Action), ticket TCP.Connect.Ticket) - @io = try ( - // TODO: The IO package shouldn't expose this unsafe interface that - // could be used to circumvent the capability security of the TCP package. - // Instead, the relevant code should be carefully moved to this package. - IO.CoreEngine.new_tcp_connect!( - actor - ticket.host - ticket.port - ticket.from_port - ) + :fun non _asio_flags + if Platform.is_windows ( + AsioEvent.Flags.read_write | - @connect_error = OSError.EINVAL - IO.CoreEngine.new(AsioEvent.ID.null) // an invalid one + AsioEvent.Flags.read_write_oneshot ) + + :: Create a new TCP engine based on an outbound connection. + :: + :: The given `ticket` specifies the connection details, and also proves + :: (via capability security) that the caller has authority to connect. + :new (actor IO.Actor(IO.Action), ticket TCP.Connect.Ticket) + // Begin with an "empty" IO core engine - we'll fill it later + // after one of the attempted TCP connections succeeds. + @io = IO.CoreEngine.new(AsioEvent.ID.null) @write_stream = ByteStream.Writer.new(@io) - :new accept( - actor IO.Actor(IO.Action) - ticket TCP.Accept.Ticket - ) + // If IPv4 and IPv6 resolutions are both possible, the runtime will try to + // connect with both parallel; we'll later adopt whichever succeeds first. + @_pending_connect_count = _FFI.pony_os_connect_tcp( + actor + ticket.host.cstring, ticket.port.cstring, ticket.from_port.cstring + @_asio_flags + ) + + // If we failed to resolve any valid connection attempts, send the actor + // a later IO action that will let it know that connection has failed. + if (@_pending_connect_count == 0) ( + actor.io_deferred_action(IO.Action.OpenFailed) + ) + + :: Create a new TCP engine based on an accepting an inbound connection. + :: + :: The given `ticket` is a single-use capability that originated in a + :: `TCP.Listen.Engine` that had an incoming connection available to accept. + :new accept(actor IO.Actor(IO.Action), ticket TCP.Accept.Ticket) actor.io_deferred_action(IO.Action.Opened) @io = IO.CoreEngine.new( _FFI.pony_asio_event_create(actor, ticket._fd, @_asio_flags, 0, True) @@ -34,15 +49,30 @@ @write_stream = ByteStream.Writer.new(@io) @_listener = ticket._listener - :fun non _asio_flags - if Platform.is_windows ( - AsioEvent.Flags.read_write - | - AsioEvent.Flags.read_write_oneshot - ) - :fun ref react(event AsioEvent) @ :yields IO.Action + // If we haven't adopted an event yet, and this one is ready to be adopted, + // try to adopt it now, as we expect it is one of our pending connections. + if (@io.is_waiting_to_open && event.is_writable) ( + try ( + @_pending_connect_count -= 1 + @io.adopt_event!(event) + yield IO.Action.Opened + | + // We failed to adopt it because it was a failed connection attempt. + // If there are no more pending connection attempts, our last one has + // failed and we have no choice but to admit final failure. + if (@_pending_connect_count == 0) ( + yield IO.Action.OpenFailed + ) + + // Return early because we don't want to do anything with this event + // after having failed to adopt it already. + return @ + ) + ) + + // Now, pass the event to the inner engine and react to its yielded actions. @io.react(event) -> (action | case action == ( | IO.Action.Closed | @@ -67,6 +97,7 @@ yield action ) ) + @ :fun ref close diff --git a/src/_FFI.savi b/src/_FFI.savi index 2fa4b6a..a80032b 100644 --- a/src/_FFI.savi +++ b/src/_FFI.savi @@ -7,6 +7,7 @@ :ffi pony_asio_event_unsubscribe(event AsioEvent.ID) None :ffi pony_asio_event_destroy(event AsioEvent.ID) None + :ffi pony_os_connect_tcp(owner AsioEvent.Actor, host CPointer(U8), service CPointer(U8), from CPointer(U8), asio_flags U32) I32 :ffi pony_os_listen_tcp(owner AsioEvent.Actor, host CPointer(U8), service CPointer(U8)) AsioEvent.ID :ffi pony_os_accept(event AsioEvent.ID) U32 :ffi pony_os_socket_close(fd U32) None