Skip to content

Commit

Permalink
Use interprocess pipe to signal exec error in forked process
Browse files Browse the repository at this point in the history
Implements canonical fork/ exec error signaling process that uses a pipe
configure to close on `exec` sys. call to notify the parent process if
an error occurs in the child and it is unable to `exec` the desired
executable file

- Implement Pipe abstraction
- Add `read_bytes` capability to `FileDecscriptor`
- Add file descriptor controls function to Libc. bindings
- Fix previously broken test for rasing on exec errors

Refs:
- https://cr.yp.to/docs/selfpipe.html

Signed-off-by: Hristo I. Gueorguiev <53634432+izo0x90@users.noreply.github.com>
  • Loading branch information
izo0x90 committed Feb 25, 2025
1 parent 6cb5721 commit ab13502
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 21 deletions.
159 changes: 154 additions & 5 deletions stdlib/src/os/process.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,140 @@ Example:
from os import Process
```
"""
from collections import Optional
from collections.string import StringSlice

from sys import (
external_call,
os_is_linux,
os_is_macos,
os_is_windows,
)
from sys._libc import vfork, execvp, kill, SignalCodes
from sys.ffi import OpaquePointer, c_char, c_int, c_str_ptr
from sys._libc import (
vfork,
execvp,
exit,
kill,
SignalCodes,
pipe,
fcntl,
FcntlCommands,
FcntlFDFlags,
close,
)
from sys.ffi import c_char, c_int, c_str_ptr
from sys.os import sep

from memory import UnsafePointer
from memory import Span, UnsafePointer


# ===----------------------------------------------------------------------=== #
# Process comm.
# ===----------------------------------------------------------------------=== #
struct Pipe:
"""Create a pipe for interprocess communication.
Example usage:
```
pipe().write_bytes("TEST".as_bytes())
```
"""

var fd_in: Optional[FileDescriptor]
"""File descriptor for pipe input."""
var fd_out: Optional[FileDescriptor]
"""File descriptor for pipe output."""

fn __init__(
mut self,
in_close_on_exec: Bool = False,
out_close_on_exec: Bool = False,
) raises:
"""Struct to manage interprocess pipe comms.
Args:
in_close_on_exec: Close the read side of pipe if `exec` sys. call is issued in process.
out_close_on_exec: Close the write side of pipe if `exec` sys. call is issued in process.
"""
var pipe_fds = UnsafePointer[c_int].alloc(2)
if pipe(pipe_fds) < 0:
pipe_fds.free()
raise Error("Failed to create pipe")

if in_close_on_exec:
if not self._set_close_on_exec(pipe_fds[0]):
pipe_fds.free()
raise Error("Failed to configure input pipe close on exec")

if out_close_on_exec:
if not self._set_close_on_exec(pipe_fds[1]):
pipe_fds.free()
raise Error("Failed to configure output pipe close on exec")

self.fd_in = FileDescriptor(Int(pipe_fds[0]))
self.fd_out = FileDescriptor(Int(pipe_fds[1]))
pipe_fds.free()

fn __del__(owned self):
"""Ensures pipes input and output file descriptors are closed, when the object is destroyed.
"""
self.set_input_only()
self.set_output_only()

@staticmethod
fn _set_close_on_exec(fd: c_int) -> Bool:
return (
fcntl(
fd,
FcntlCommands.F_SETFD,
fcntl(fd, FcntlCommands.F_GETFD, 0) | FcntlFDFlags.FD_CLOEXEC,
)
== 0
)

@always_inline
fn set_input_only(mut self):
"""Close the output descriptor/ channel for this side of the pipe."""
if self.fd_out:
_ = close(rebind[Int](self.fd_out.value()))
self.fd_out = None

@always_inline
fn set_output_only(mut self):
"""Close the input descriptor/ channel for this side of the pipe."""
if self.fd_in:
_ = close(rebind[Int](self.fd_in.value()))
self.fd_in = None

@always_inline
fn write_bytes(mut self, bytes: Span[Byte, _]) raises:
"""
Write a span of bytes to the pipe.
Args:
bytes: The byte span to write to this pipe.
"""
if self.fd_out:
self.fd_out.value().write_bytes(bytes)
else:
raise Error("Can not write from read only side of pipe")

@always_inline
fn read_bytes(mut self, size: Int) raises -> Span[Byte, MutableAnyOrigin]:
"""
Read a span of bytes from this pipe.
Args:
size: The number of bytes to read from this pipe.
Returns:
Span of bytes with len=size read from this pipe.
"""
if self.fd_in:
return self.fd_in.value().read_bytes(size)

raise Error("Can not read from write only side of pipe")


# ===----------------------------------------------------------------------=== #
# Process execution
Expand Down Expand Up @@ -103,8 +225,15 @@ struct Process:
@parameter
if os_is_linux() or os_is_macos():
var file_name = path.split(sep)[-1]
var pipe = Pipe(out_close_on_exec=True)
var exec_err_code = String("EXEC_ERR")

var pid = vfork()

if pid == 0:
"""Child process."""
pipe.set_output_only()

var arg_count = len(argv)
var argv_array_ptr_cstr_ptr = UnsafePointer[c_str_ptr].alloc(
arg_count + 2
Expand All @@ -125,10 +254,30 @@ struct Process:

# This will only get reached if exec call fails to replace currently executing code
argv_array_ptr_cstr_ptr.free()
raise Error("Failed to execute " + path)

# Canonical fork/ exec error handling pattern of using a pipe that closes on exec is
# used to signal error to parent process `https://cr.yp.to/docs/selfpipe.html`
pipe.write_bytes(exec_err_code.as_bytes())

exit(1)

elif pid < 0:
raise Error("Unable to fork parent")

pipe.set_input_only()
var err: Optional[Span[Byte, MutableAnyOrigin]]
try:
err = pipe.read_bytes(exec_err_code.byte_length())
except e:
err = None

if (
err
and len(err.value()) > 0
and StringSlice(unsafe_from_utf8=err.value()) == exec_err_code
):
raise Error("Failed to execute " + path)

return Process(child_pid=pid)
elif os_is_windows():
constrained[
Expand Down
30 changes: 14 additions & 16 deletions stdlib/test/os/test_process.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,24 @@ from os import Process
from testing import assert_false, assert_raises


# CHECK-LABEL: TEST
# CHECK-LABEL: TEST_ECHO
def test_process_run():
_ = Process.run("echo", List[String]("== TEST"))
_ = Process.run("echo", List[String]("== TEST_ECHO"))


# def test_process_run_missing():
# # assert_raises does not work with exception raised in child process
# # crashes with thread error
# missing_executable_file = "ThIsFiLeCoUlDNoTPoSsIbLlYExIsT.NoTAnExTeNsIoN"
#
# # verify that the test file does not exist before starting the test
# assert_false(
# exists(missing_executable_file),
# "Unexpected file '" + missing_executable_file + "' it should not exist",
# )
#
# # Forking appears to break asserts
# with assert_raises():
# _ = Process.run(missing_executable_file, List[String]())
def test_process_run_missing():
missing_executable_file = "ThIsFiLeCoUlDNoTPoSsIbLlYExIsT.NoTAnExTeNsIoN"

# verify that the test file does not exist before starting the test
assert_false(
exists(missing_executable_file),
"Unexpected file '" + missing_executable_file + "' it should not exist",
)

with assert_raises():
_ = Process.run(missing_executable_file, List[String]())


def main():
test_process_run()
test_process_run_missing()

0 comments on commit ab13502

Please sign in to comment.