Skip to content

Commit

Permalink
[GR-18163] Call unblock functions using JNI in CExtInterrupter
Browse files Browse the repository at this point in the history
PullRequest: truffleruby/4116
  • Loading branch information
eregon committed Jan 15, 2024
2 parents c68549d + 6bbd0de commit 44cf286
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Bug fixes:
* Handle a new variable inside the `case` target expression correctly (#3377, @eregon).
* The arguments of `Thread.new(*args, &block)` need to be marked as shared between multiple threads (#3179, @eregon).
* Fix `Range#bsearch` and raise `TypeError` when range boundaries are non-numeric and block not passed (@andrykonchin).
* Fix using the `--cpusampler` profiler when there are custom unblock functions for `rb_thread_call_without_gvl()` (#3013, @eregon).

Compatibility:

Expand Down
2 changes: 1 addition & 1 deletion lib/cext/ABI_check.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7
8
7 changes: 6 additions & 1 deletion lib/truffle/truffle/cext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1937,9 +1937,14 @@ def rb_thread_call_without_gvl(function, data1, unblock, data2)
end

private def rb_thread_call_without_gvl_inner(function, data1, unblock, data2)
if SULONG
Truffle::Interop.to_native(unblock)
Truffle::Interop.to_native(data2)
end

Primitive.call_with_unblocking_function(Thread.current,
POINTER_TO_POINTER_WRAPPER, function, data1,
POINTER_TO_VOID_WRAPPER, unblock, data2)
Truffle::Interop.as_pointer(unblock), Truffle::Interop.as_pointer(data2))
end

def rb_iterate(iteration, iterated_object, callback, callback_arg)
Expand Down
4 changes: 2 additions & 2 deletions spec/ruby/optional/capi/ext/thread_spec.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static VALUE thread_spec_rb_thread_call_without_gvl(VALUE self) {
}

/* This is unblocked by a signal. */
static void* blocking_gvl_func_for_udf_io(void *data) {
static void* blocking_gvl_func_for_ubf_io(void *data) {
int rfd = (int)(size_t)data;
char dummy;

Expand All @@ -89,7 +89,7 @@ static VALUE thread_spec_rb_thread_call_without_gvl_with_ubf_io(VALUE self) {
rb_raise(rb_eRuntimeError, "could not create pipe");
}

ret = rb_thread_call_without_gvl(blocking_gvl_func_for_udf_io,
ret = rb_thread_call_without_gvl(blocking_gvl_func_for_ubf_io,
(void*)(size_t)fds[0], RUBY_UBF_IO, 0);
close(fds[0]);
close(fds[1]);
Expand Down
1 change: 1 addition & 0 deletions spec/tags/optional/capi/thread_tags.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sulong(would need to enter the context to call the unblock function):C-API Thread function rb_thread_call_without_gvl runs a C function with the global lock unlocked and can be woken by a signal
4 changes: 4 additions & 0 deletions spec/tags/truffle/capi/thread_tags.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
slow:TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with RUBY_UBF_IO when using CPUSampler
slow:TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with a custom unblock function when using CPUSampler
sulong(safepoint inside C code instead):TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with RUBY_UBF_IO when using CPUSampler
sulong(would need to enter the context to call the unblock function):TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with a custom unblock function when using CPUSampler
1 change: 1 addition & 0 deletions spec/tags/truffle/capi/unimplemented_tags.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
slow:Unimplemented functions in the C-API abort the process and show an error including the function name
sulong(different behavior):Unimplemented functions in the C-API abort the process and show an error including the function name
85 changes: 85 additions & 0 deletions spec/truffle/capi/ext/truffleruby_thread_spec.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include "ruby/thread.h"
#include "rubyspec.h"

#include <errno.h>
#include <time.h>
#include <stdio.h>

#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -21,9 +25,90 @@ static VALUE thread_spec_rb_thread_call_without_gvl_native_function(VALUE self)
return LONG2FIX(ret);
}

static void* call_check_ints(void* arg) {
rb_thread_check_ints();
return NULL;
}

static void* block_sleep(void* arg) {
struct timespec remaining = { .tv_sec = 1, .tv_nsec = 0 };
while (nanosleep(&remaining, &remaining) == -1 && errno == EINTR) {
// Similar to how ossl_pkey.c does it
rb_thread_call_with_gvl(call_check_ints, NULL);
}
return (void*) Qtrue;
}

static VALUE thread_spec_rb_thread_call_without_gvl_unblock_signal(VALUE self) {
return (VALUE) rb_thread_call_without_gvl(block_sleep, NULL, RUBY_UBF_IO, NULL);
}

static void* block(void* arg) {
int fd = *(int*)arg;
char buffer = ' ';
ssize_t r;

while (true) {
ssize_t r = read(fd, &buffer, 1);
if (r == 1) {
if (buffer == 'D') { // done
return (void*) Qtrue;
} else if (buffer == 'U') { // unblock
// Similar to how ossl_pkey.c does it
rb_thread_call_with_gvl(call_check_ints, NULL);
continue;
} else {
return (void*) rb_str_new(&buffer, 1);
}
} else {
perror("read() in blocking function returned != 1");
return (void*) Qfalse;
}
}
}

static void unblock(void* arg) {
int fd = *(int*)arg;
char buffer = 'U';
while (write(fd, &buffer, 1) == -1 && errno == EINTR) {
// retry
}
}

static VALUE finish(void* arg) {
int fd = *(int*)arg;

// Wait 1 second
struct timespec remaining = { .tv_sec = 1, .tv_nsec = 0 };
while (nanosleep(&remaining, &remaining) == -1 && errno == EINTR) {
// Sleep the remaining amount
}

char buffer = 'D';
while (write(fd, &buffer, 1) == -1 && errno == EINTR) {
// retry
}
return Qtrue;
}

static VALUE thread_spec_rb_thread_call_without_gvl_unblock_custom_function(VALUE self) {
int fds[2];
if (pipe(fds) == -1) {
rb_raise(rb_eRuntimeError, "could not create pipe");
}

VALUE thread = rb_funcall(rb_block_proc(), rb_intern("call"), 1, INT2FIX(fds[1]));

rb_thread_call_without_gvl(block, &fds[0], unblock, &fds[1]);

return rb_funcall(thread, rb_intern("join"), 0);
}

void Init_truffleruby_thread_spec(void) {
VALUE cls = rb_define_class("CApiTruffleRubyThreadSpecs", rb_cObject);
rb_define_method(cls, "rb_thread_call_without_gvl_native_function", thread_spec_rb_thread_call_without_gvl_native_function, 0);
rb_define_method(cls, "rb_thread_call_without_gvl_unblock_signal", thread_spec_rb_thread_call_without_gvl_unblock_signal, 0);
rb_define_method(cls, "rb_thread_call_without_gvl_unblock_custom_function", thread_spec_rb_thread_call_without_gvl_unblock_custom_function, 0);
}

#ifdef __cplusplus
Expand Down
18 changes: 17 additions & 1 deletion spec/truffle/capi/thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

require_relative '../../ruby/optional/capi/spec_helper'

load_extension("truffleruby_thread")
extension_path = load_extension("truffleruby_thread")

describe "TruffleRuby C-API Thread function" do
before :each do
Expand All @@ -19,5 +19,21 @@
it "runs a native function with the global lock unlocked" do
@t.rb_thread_call_without_gvl_native_function.should == Process.pid
end

it "is unblocked with RUBY_UBF_IO when using CPUSampler" do
code = "require #{extension_path.dump}; CApiTruffleRubyThreadSpecs.new.rb_thread_call_without_gvl_unblock_signal"
out = ruby_exe(code, options: '--cpusampler')
out.should.include?('rb_thread_call_without_gvl_unblock_signal')
out.should.include?('rb_thread_call_without_gvl')
out.should.include?('rb_thread_call_with_gvl') # which checks guest safepoints
end

it "is unblocked with a custom unblock function when using CPUSampler" do
code = "require #{extension_path.dump}; CApiTruffleRubyThreadSpecs.new.rb_thread_call_without_gvl_unblock_custom_function { |fd| Thread.new { sleep 1; IO.for_fd(fd, autoclose: false).write 'D' } }"
out = ruby_exe(code, options: '--cpusampler')
out.should.include?('rb_thread_call_without_gvl_unblock_custom_function')
out.should.include?('rb_thread_call_without_gvl')
out.should.include?('rb_thread_call_with_gvl') # which checks guest safepoints
end
end
end
10 changes: 4 additions & 6 deletions spec/truffle/capi/unimplemented_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
extension_path = load_extension("unimplemented")

describe "Unimplemented functions in the C-API" do
guard_not -> { Truffle::Boot.get_option('cexts-sulong') } do
it "abort the process and show an error including the function name" do
expected_status = platform_is(:darwin) ? :SIGABRT : 127
out = ruby_exe('require ARGV[0]; CApiRbTrErrorSpecs.new.not_implemented_function("foo")', args: "#{extension_path} 2>&1", exit_status: expected_status)
out.should =~ /undefined symbol: rb_str_shared_replace|Symbol not found: _rb_str_shared_replace/
end
it "abort the process and show an error including the function name" do
expected_status = platform_is(:darwin) ? :SIGABRT : 127
out = ruby_exe('require ARGV[0]; CApiRbTrErrorSpecs.new.not_implemented_function("foo")', args: "#{extension_path} 2>&1", exit_status: expected_status)
out.should =~ /undefined symbol: rb_str_shared_replace|Symbol not found: _rb_str_shared_replace/
end
end
4 changes: 4 additions & 0 deletions spec/truffleruby.mspec
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class MSpecScript
else
excludes << 'jvm'
end

if Truffle::Boot.get_option('cexts-sulong')
excludes << 'sulong'
end
end

if windows?
Expand Down
2 changes: 1 addition & 1 deletion src/main/c/cext/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void *rb_thread_call_with_gvl(gvl_call *function, void *data1) {

void* rb_thread_call_without_gvl(gvl_call *function, void *data1, rb_unblock_function_t *unblock_function, void *data2) {
if (unblock_function == RUBY_UBF_IO) {
unblock_function = (rb_unblock_function_t*) rb_tr_unwrap(Qnil);
unblock_function = (rb_unblock_function_t*) NULL;
}

return polyglot_invoke(RUBY_CEXT, "rb_thread_call_without_gvl", function, data1, unblock_function, data2);
Expand Down
9 changes: 9 additions & 0 deletions src/main/c/rubysignal/src/rubysignal.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,12 @@ JNIEXPORT void JNICALL Java_org_truffleruby_signal_LibRubySignal_restoreSystemHa
signal(signo, SIG_DFL);
raise(signo);
}

// Declaration copied from lib/cext/include/ruby/internal/intern/thread.h
typedef void rb_unblock_function_t(void *);

JNIEXPORT void JNICALL Java_org_truffleruby_signal_LibRubySignal_executeUnblockFunction(JNIEnv *env, jclass clazz, jlong function, jlong argument) {
rb_unblock_function_t* unblock_function = (rb_unblock_function_t*) function;
void* arg = (void*) argument;
unblock_function(arg);
}
64 changes: 10 additions & 54 deletions src/main/java/org/truffleruby/core/thread/ThreadNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.TimeUnit;

import com.oracle.truffle.api.RootCallTarget;
import com.oracle.truffle.api.TruffleContext;
import com.oracle.truffle.api.TruffleSafepoint;
import com.oracle.truffle.api.TruffleSafepoint.Interrupter;
import com.oracle.truffle.api.dsl.Bind;
Expand Down Expand Up @@ -109,13 +108,13 @@
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.ImportStatic;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.interop.InteropException;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.Shape;
import com.oracle.truffle.api.profiles.BranchProfile;
import com.oracle.truffle.api.source.SourceSection;
import org.truffleruby.signal.LibRubySignal;

import static org.truffleruby.language.SafepointPredicate.ALL_THREADS_AND_FIBERS;

Expand Down Expand Up @@ -658,23 +657,18 @@ public abstract static class CallWithUnblockingFunctionNode extends PrimitiveArr
@SuppressWarnings("truffle-neverdefault") // GR-43642
@Specialization(limit = "getCacheLimit()")
static Object call(
RubyThread thread,
Object wrapper,
Object function,
Object arg,
Object unblockWrapper,
Object unblocker,
Object unblockerArg,
RubyThread thread, Object wrapper, Object function, Object arg, long unblocker, long unblockerArg,
@CachedLibrary("wrapper") InteropLibrary receivers,
@Cached TranslateInteropExceptionNode translateInteropExceptionNode,
@Bind("this") Node node,
@Cached("new(node, receivers, translateInteropExceptionNode)") BlockingCallInterruptible blockingCallInterruptible) {
final ThreadManager threadManager = getContext(node).getThreadManager();
var context = getContext(node);
final ThreadManager threadManager = context.getThreadManager();
final Interrupter interrupter;
if (unblocker == nil) {
if (unblocker == 0) {
interrupter = threadManager.getNativeCallInterrupter();
} else {
interrupter = makeInterrupter(getContext(node), unblockWrapper, unblocker, unblockerArg);
interrupter = new CExtInterrupter(unblocker, unblockerArg);
}

final Object[] args = { function, arg };
Expand All @@ -687,57 +681,19 @@ static Object call(
node);
}

@TruffleBoundary
private static Interrupter makeInterrupter(RubyContext context, Object wrapper, Object function,
Object argument) {
return new CExtInterrupter(context, wrapper, function, argument);
}

private static final class CExtInterrupter implements Interrupter {

private final RubyContext context;
private final Object wrapper;
private final Object function;
private final Object argument;
private final long function;
private final long argument;

public CExtInterrupter(RubyContext context, Object wrapper, Object function, Object argument) {
assert InteropLibrary.getUncached().isExecutable(wrapper);
this.context = context;
this.wrapper = wrapper;
public CExtInterrupter(long function, long argument) {
this.function = function;
this.argument = argument;
}

@Override
public void interrupt(Thread thread) {
final TruffleContext truffleContext = context.getEnv().getContext();
final boolean alreadyEntered = truffleContext.isEntered();
Object prev = null;
if (!alreadyEntered) {
// We need to enter the context to execute this unblocking action, as it runs on Sulong.
try {
if (context.getOptions().SINGLE_THREADED) {
throw new IllegalStateException("--single-threaded was passed");
}
prev = truffleContext.enter(null);
} catch (IllegalStateException e) { // Multi threaded access denied from Truffle
// Not in a context, so we cannot use TruffleLogger
context.getLogger().severe(
"could not unblock thread inside blocking call in C extension because " +
"the context does not allow multithreading (" + e.getMessage() + ")");
return;
}
}

try {
InteropLibrary.getUncached().execute(wrapper, function, argument);
} catch (InteropException e) {
throw CompilerDirectives.shouldNotReachHere(e);
} finally {
if (!alreadyEntered) {
truffleContext.leave(null, prev);
}
}
LibRubySignal.executeUnblockFunction(function, argument);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/signal/java/org/truffleruby/signal/LibRubySignal.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public static void loadLibrary(String rubyHome, String libSuffix) {

public static native void restoreSystemHandlerAndRaise(int signalNumber);

public static native void executeUnblockFunction(long function, long argument);

}

0 comments on commit 44cf286

Please sign in to comment.