From ab13502a35fc6ca4b373fd8a2bc89fee1f94f711 Mon Sep 17 00:00:00 2001 From: "Hristo (Izo) Gueorguiev" <53634432+izo0x90@users.noreply.github.com> Date: Mon, 17 Feb 2025 11:19:39 -0500 Subject: [PATCH] Use interprocess pipe to signal exec error in forked process Implements canonical fork/ exec error signaling process that uses a pipe configure to close on `exec` sys. call to notify the parent process if an error occurs in the child and it is unable to `exec` the desired executable file - Implement Pipe abstraction - Add `read_bytes` capability to `FileDecscriptor` - Add file descriptor controls function to Libc. bindings - Fix previously broken test for rasing on exec errors Refs: - https://cr.yp.to/docs/selfpipe.html Signed-off-by: Hristo I. Gueorguiev <53634432+izo0x90@users.noreply.github.com> --- stdlib/src/os/process.mojo | 159 ++++++++++++++++++++++++++++++- stdlib/test/os/test_process.mojo | 30 +++--- 2 files changed, 168 insertions(+), 21 deletions(-) diff --git a/stdlib/src/os/process.mojo b/stdlib/src/os/process.mojo index 0ae5abef433..ff870e6804a 100644 --- a/stdlib/src/os/process.mojo +++ b/stdlib/src/os/process.mojo @@ -18,18 +18,140 @@ Example: from os import Process ``` """ +from collections import Optional +from collections.string import StringSlice from sys import ( - external_call, os_is_linux, os_is_macos, os_is_windows, ) -from sys._libc import vfork, execvp, kill, SignalCodes -from sys.ffi import OpaquePointer, c_char, c_int, c_str_ptr +from sys._libc import ( + vfork, + execvp, + exit, + kill, + SignalCodes, + pipe, + fcntl, + FcntlCommands, + FcntlFDFlags, + close, +) +from sys.ffi import c_char, c_int, c_str_ptr from sys.os import sep -from memory import UnsafePointer +from memory import Span, UnsafePointer + + +# ===----------------------------------------------------------------------=== # +# Process comm. +# ===----------------------------------------------------------------------=== # +struct Pipe: + """Create a pipe for interprocess communication. + + Example usage: + ``` + pipe().write_bytes("TEST".as_bytes()) + ``` + """ + + var fd_in: Optional[FileDescriptor] + """File descriptor for pipe input.""" + var fd_out: Optional[FileDescriptor] + """File descriptor for pipe output.""" + + fn __init__( + mut self, + in_close_on_exec: Bool = False, + out_close_on_exec: Bool = False, + ) raises: + """Struct to manage interprocess pipe comms. + + Args: + in_close_on_exec: Close the read side of pipe if `exec` sys. call is issued in process. + out_close_on_exec: Close the write side of pipe if `exec` sys. call is issued in process. + """ + var pipe_fds = UnsafePointer[c_int].alloc(2) + if pipe(pipe_fds) < 0: + pipe_fds.free() + raise Error("Failed to create pipe") + + if in_close_on_exec: + if not self._set_close_on_exec(pipe_fds[0]): + pipe_fds.free() + raise Error("Failed to configure input pipe close on exec") + + if out_close_on_exec: + if not self._set_close_on_exec(pipe_fds[1]): + pipe_fds.free() + raise Error("Failed to configure output pipe close on exec") + + self.fd_in = FileDescriptor(Int(pipe_fds[0])) + self.fd_out = FileDescriptor(Int(pipe_fds[1])) + pipe_fds.free() + + fn __del__(owned self): + """Ensures pipes input and output file descriptors are closed, when the object is destroyed. + """ + self.set_input_only() + self.set_output_only() + + @staticmethod + fn _set_close_on_exec(fd: c_int) -> Bool: + return ( + fcntl( + fd, + FcntlCommands.F_SETFD, + fcntl(fd, FcntlCommands.F_GETFD, 0) | FcntlFDFlags.FD_CLOEXEC, + ) + == 0 + ) + + @always_inline + fn set_input_only(mut self): + """Close the output descriptor/ channel for this side of the pipe.""" + if self.fd_out: + _ = close(rebind[Int](self.fd_out.value())) + self.fd_out = None + + @always_inline + fn set_output_only(mut self): + """Close the input descriptor/ channel for this side of the pipe.""" + if self.fd_in: + _ = close(rebind[Int](self.fd_in.value())) + self.fd_in = None + + @always_inline + fn write_bytes(mut self, bytes: Span[Byte, _]) raises: + """ + Write a span of bytes to the pipe. + + Args: + bytes: The byte span to write to this pipe. + + """ + if self.fd_out: + self.fd_out.value().write_bytes(bytes) + else: + raise Error("Can not write from read only side of pipe") + + @always_inline + fn read_bytes(mut self, size: Int) raises -> Span[Byte, MutableAnyOrigin]: + """ + Read a span of bytes from this pipe. + + Args: + size: The number of bytes to read from this pipe. + + Returns: + Span of bytes with len=size read from this pipe. + """ + if self.fd_in: + return self.fd_in.value().read_bytes(size) + + raise Error("Can not read from write only side of pipe") + # ===----------------------------------------------------------------------=== # # Process execution @@ -103,8 +225,15 @@ struct Process: @parameter if os_is_linux() or os_is_macos(): var file_name = path.split(sep)[-1] + var pipe = Pipe(out_close_on_exec=True) + var exec_err_code = String("EXEC_ERR") + var pid = vfork() + if pid == 0: + """Child process.""" + pipe.set_output_only() + var arg_count = len(argv) var argv_array_ptr_cstr_ptr = UnsafePointer[c_str_ptr].alloc( arg_count + 2 @@ -125,10 +254,30 @@ struct Process: # This will only get reached if exec call fails to replace currently executing code argv_array_ptr_cstr_ptr.free() - raise Error("Failed to execute " + path) + + # Canonical fork/ exec error handling pattern of using a pipe that closes on exec is + # used to signal error to parent process `https://cr.yp.to/docs/selfpipe.html` + pipe.write_bytes(exec_err_code.as_bytes()) + + exit(1) + elif pid < 0: raise Error("Unable to fork parent") + pipe.set_input_only() + var err: Optional[Span[Byte, MutableAnyOrigin]] + try: + err = pipe.read_bytes(exec_err_code.byte_length()) + except e: + err = None + + if ( + err + and len(err.value()) > 0 + and StringSlice(unsafe_from_utf8=err.value()) == exec_err_code + ): + raise Error("Failed to execute " + path) + return Process(child_pid=pid) elif os_is_windows(): constrained[ diff --git a/stdlib/test/os/test_process.mojo b/stdlib/test/os/test_process.mojo index f3c098bf58f..53a0d95309b 100644 --- a/stdlib/test/os/test_process.mojo +++ b/stdlib/test/os/test_process.mojo @@ -20,26 +20,24 @@ from os import Process from testing import assert_false, assert_raises -# CHECK-LABEL: TEST +# CHECK-LABEL: TEST_ECHO def test_process_run(): - _ = Process.run("echo", List[String]("== TEST")) + _ = Process.run("echo", List[String]("== TEST_ECHO")) -# def test_process_run_missing(): -# # assert_raises does not work with exception raised in child process -# # crashes with thread error -# missing_executable_file = "ThIsFiLeCoUlDNoTPoSsIbLlYExIsT.NoTAnExTeNsIoN" -# -# # verify that the test file does not exist before starting the test -# assert_false( -# exists(missing_executable_file), -# "Unexpected file '" + missing_executable_file + "' it should not exist", -# ) -# -# # Forking appears to break asserts -# with assert_raises(): -# _ = Process.run(missing_executable_file, List[String]()) +def test_process_run_missing(): + missing_executable_file = "ThIsFiLeCoUlDNoTPoSsIbLlYExIsT.NoTAnExTeNsIoN" + + # verify that the test file does not exist before starting the test + assert_false( + exists(missing_executable_file), + "Unexpected file '" + missing_executable_file + "' it should not exist", + ) + + with assert_raises(): + _ = Process.run(missing_executable_file, List[String]()) def main(): test_process_run() + test_process_run_missing()