Skip to content

Commit

Permalink
Add callback support to FileDescription
Browse files Browse the repository at this point in the history
   - Implementing atomic reads for contiguous buffers
   - Supports read operations with callback-based completion.

Signed-off-by: shamb0 <r.raajey@gmail.com>
  • Loading branch information
shamb0 committed Jan 11, 2025
1 parent bd99bc7 commit 2226854
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 100 deletions.
35 changes: 27 additions & 8 deletions src/shims/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl<T: FileDescription + 'static> FileDescriptionExt for T {

pub type DynFileDescriptionRef = FileDescriptionRef<dyn FileDescription>;

/// Represents a dynamic callback for file I/O operations that is invoked upon completion.
/// The callback receives either the number of bytes successfully read (u64) or an IoError.
pub type DynFileDescriptionCallback<'tcx> = DynMachineCallback<'tcx, Result<u64, IoError>>;

impl FileDescriptionRef<dyn FileDescription> {
pub fn downcast<T: FileDescription + 'static>(self) -> Option<FileDescriptionRef<T>> {
let inner = self.into_rc_any().downcast::<FdIdWith<T>>().ok()?;
Expand All @@ -134,13 +138,14 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt {

/// Reads as much as possible into the given buffer `ptr`.
/// `len` indicates how many bytes we should try to read.
/// `dest` is where the return value should be stored: number of bytes read, or `-1` in case of error.
/// `finish` Callback to be invoked on operation completion with bytes read or error
#[allow(dead_code)]
fn read<'tcx>(
self: FileDescriptionRef<Self>,
_communicate_allowed: bool,
_ptr: Pointer,
_len: usize,
_dest: &MPlaceTy<'tcx>,
_finish: DynFileDescriptionCallback<'tcx>,
_ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
throw_unsup_format!("cannot read from {}", self.name());
Expand Down Expand Up @@ -207,18 +212,32 @@ impl FileDescription for io::Stdin {
communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
finish: DynFileDescriptionCallback<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let mut bytes = vec![0; len];
// First handle isolation mode check
if !communicate_allowed {
// We want isolation mode to be deterministic, so we have to disallow all reads, even stdin.
helpers::isolation_abort_error("`read` from stdin")?;
}
let result = Read::read(&mut &*self, &mut bytes);
match result {
Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest),
Err(e) => ecx.set_last_error_and_return(e, dest),

let mut bytes = vec![0; len];

match Read::read(&mut &*self, &mut bytes) {
Ok(actual_read_size) => {
// Write the successfully read bytes to the destination pointer
ecx.write_bytes_ptr(ptr, bytes[..actual_read_size].iter().copied())?;

let Ok(read_size) = u64::try_from(actual_read_size) else {
throw_unsup_format!(
"Read operation returned size {} which exceeds maximum allowed value",
actual_read_size
)
};

finish.call(ecx, Ok(read_size))
}
Err(e) => finish.call(ecx, Err(e.into())),
}
}

Expand Down
27 changes: 23 additions & 4 deletions src/shims/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::io::ErrorKind;
use rustc_abi::Size;

use crate::helpers::check_min_arg_count;
use crate::shims::files::FileDescription;
use crate::shims::files::{DynFileDescriptionCallback, FileDescription};
use crate::shims::unix::linux_like::epoll::EpollReadyEvents;
use crate::shims::unix::*;
use crate::*;
Expand Down Expand Up @@ -203,7 +203,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(Scalar::from_i32(this.try_unwrap_io_result(result)?))
}

/// Read data from `fd` into buffer specified by `buf` and `count`.
/// Reads data from a file descriptor using callback-based completion.
///
/// If `offset` is `None`, reads data from current cursor position associated with `fd`
/// and updates cursor position on completion. Otherwise, reads from the specified offset
Expand Down Expand Up @@ -239,13 +239,32 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return(LibcError("EBADF"), dest);
};

trace!("read: FD mapped to {fd:?}");
trace!("Reading from FD {}, size {}, offset {:?}", fd_num, count, offset);
// We want to read at most `count` bytes. We are sure that `count` is not negative
// because it was a target's `usize`. Also we are sure that its smaller than
// `usize::MAX` because it is bounded by the host's `isize`.

// Clone the result destination for use in the completion callback
let result_destination = dest.clone();

let completion_callback: DynFileDescriptionCallback<'tcx> = callback!(
@capture<'tcx> {
result_destination: MPlaceTy<'tcx>,
}
|this, read_result: Result<u64, IoError>| {
match read_result {
Ok(read_size) => {
this.write_int(read_size, &result_destination)
}
Err(_err_code) => {
this.set_last_error_and_return(LibcError("EIO"), &result_destination)
}
}
}
);

match offset {
None => fd.read(communicate, buf, count, dest, this)?,
None => fd.read(communicate, buf, count, completion_callback, this)?,
Some(offset) => {
let Ok(offset) = u64::try_from(offset) else {
return this.set_last_error_and_return(LibcError("EINVAL"), dest);
Expand Down
146 changes: 135 additions & 11 deletions src/shims/unix/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use rustc_data_structures::fx::FxHashMap;

use self::shims::time::system_time_to_duration;
use crate::helpers::check_min_arg_count;
use crate::shims::files::{EvalContextExt as _, FileDescription, FileDescriptionRef};
use crate::shims::files::{
DynFileDescriptionCallback, EvalContextExt as _, FileDescription, FileDescriptionRef,
WeakFileDescriptionRef,
};
use crate::shims::os_str::bytes_to_os_str;
use crate::shims::unix::fd::{FlockOp, UnixFileDescription};
use crate::*;
Expand All @@ -23,6 +26,92 @@ use crate::*;
struct FileHandle {
file: File,
writable: bool,
/// Mutex for synchronizing file access across threads.
file_lock: MutexRef,
}

impl VisitProvenance for FileHandle {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance tracking needed for FileHandle as it contains no references.
// This implementation satisfies the trait requirement but performs no operations.
}
}

impl FileHandle {
/// Creates a new FileHandle with specified permissions and synchronization primitive.
fn new(file: File, writable: bool, file_lock: MutexRef) -> Self {
Self { file, writable, file_lock }
}

/// Attempts to create a clone of the file handle while preserving all attributes.
///
/// # Errors
/// Returns an `InterpResult` error if file handle cloning fails.
fn try_clone<'tcx>(&self) -> InterpResult<'tcx, FileHandle> {
let cloned_file = self
.file
.try_clone()
.map_err(|e| err_unsup_format!("Failed to clone file handle: {}", e))?;

interp_ok(FileHandle {
file: cloned_file,
writable: self.writable,
file_lock: self.file_lock.clone(),
})
}

/// Performs a synchronized file read with callback completion.
fn perform_read<'tcx>(
this: &mut MiriInterpCx<'tcx>,
finish: DynFileDescriptionCallback<'tcx>,
mut file_handle: FileHandle,
weak_fd: WeakFileDescriptionRef<FileHandle>,
buffer_ptr: Pointer,
length: usize,
) -> InterpResult<'tcx> {
this.mutex_lock(&file_handle.file_lock);

let result = {
// Verify file descriptor is still valid.
if weak_fd.upgrade().is_none() {
throw_unsup_format!("file got closed while blocking")
}

let mut bytes = vec![0; length];
let read_result = file_handle.file.read(&mut bytes);

// Handle the read result.
match read_result {
Ok(read_size) => {
// Write the bytes to memory.
if let Err(err_code) = this
.write_bytes_ptr(buffer_ptr, bytes[..read_size].iter().copied())
.report_err()
{
throw_unsup_format!(
"Memory write failed during file read operation: {:#?}",
err_code
)
}

let Ok(read_size) = u64::try_from(read_size) else {
throw_unsup_format!(
"Read operation returned size {} which exceeds maximum allowed value",
read_size
)
};

finish.call(this, Ok(read_size))
}
Err(err_code) => finish.call(this, Err(err_code.into())),
}
};

// Always unlock the mutex, even if the read operation failed.
this.mutex_unlock(&file_handle.file_lock)?;

result
}
}

impl FileDescription for FileHandle {
Expand All @@ -35,15 +124,42 @@ impl FileDescription for FileHandle {
communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
finish: DynFileDescriptionCallback<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let this = ecx;
assert!(communicate_allowed, "isolation should have prevented even opening a file");
let mut bytes = vec![0; len];
let result = (&mut &self.file).read(&mut bytes);
match result {
Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest),
Err(e) => ecx.set_last_error_and_return(e, dest),

// Clone the underlying File.
let clone_file_handle = match self.try_clone().report_err() {
Ok(handle) => handle,
Err(ec) => throw_unsup_format!("unable to clone file discp {:#?}", ec),
};

let weak_fd = FileDescriptionRef::downgrade(&self);

if this.mutex_is_locked(&self.file_lock) {
this.block_thread(
BlockReason::Mutex,
None,
callback!(
@capture<'tcx> {
finish: DynFileDescriptionCallback<'tcx>,
clone_file_handle: FileHandle,
weak_fd: WeakFileDescriptionRef<FileHandle>,
ptr: Pointer,
len: usize,
}
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
FileHandle::perform_read(this, finish, clone_file_handle, weak_fd, ptr, len)
}
),
);

unreachable!()
} else {
FileHandle::perform_read(this, finish, clone_file_handle, weak_fd, ptr, len)
}
}

Expand Down Expand Up @@ -584,9 +700,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return_i32(ErrorKind::PermissionDenied);
}

let fd = options
.open(path)
.map(|file| this.machine.fds.insert_new(FileHandle { file, writable }));
let fd = options.open(path).map(|file| {
this.machine.fds.insert_new(FileHandle::new(
file,
writable,
this.machine.sync.mutex_create(),
))
});

interp_ok(Scalar::from_i32(this.try_unwrap_io_result(fd)?))
}
Expand Down Expand Up @@ -1645,7 +1765,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

match file {
Ok(f) => {
let fd = this.machine.fds.insert_new(FileHandle { file: f, writable: true });
let fd = this.machine.fds.insert_new(FileHandle::new(
f,
true,
this.machine.sync.mutex_create(),
));
return interp_ok(Scalar::from_i32(fd));
}
Err(e) =>
Expand Down
Loading

0 comments on commit 2226854

Please sign in to comment.