From dd46ed9046551ebc9c425bf5699d4e38f0868753 Mon Sep 17 00:00:00 2001 From: Benoit Daloze Date: Mon, 8 Jan 2024 12:25:56 +0100 Subject: [PATCH 1/2] Cleanup --- spec/ruby/optional/capi/ext/thread_spec.c | 4 ++-- .../org/truffleruby/core/thread/ThreadNodes.java | 12 +++--------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/spec/ruby/optional/capi/ext/thread_spec.c b/spec/ruby/optional/capi/ext/thread_spec.c index 14bd20795402..47e2ca828cdb 100644 --- a/spec/ruby/optional/capi/ext/thread_spec.c +++ b/spec/ruby/optional/capi/ext/thread_spec.c @@ -69,7 +69,7 @@ static VALUE thread_spec_rb_thread_call_without_gvl(VALUE self) { } /* This is unblocked by a signal. */ -static void* blocking_gvl_func_for_udf_io(void *data) { +static void* blocking_gvl_func_for_ubf_io(void *data) { int rfd = (int)(size_t)data; char dummy; @@ -89,7 +89,7 @@ static VALUE thread_spec_rb_thread_call_without_gvl_with_ubf_io(VALUE self) { rb_raise(rb_eRuntimeError, "could not create pipe"); } - ret = rb_thread_call_without_gvl(blocking_gvl_func_for_udf_io, + ret = rb_thread_call_without_gvl(blocking_gvl_func_for_ubf_io, (void*)(size_t)fds[0], RUBY_UBF_IO, 0); close(fds[0]); close(fds[1]); diff --git a/src/main/java/org/truffleruby/core/thread/ThreadNodes.java b/src/main/java/org/truffleruby/core/thread/ThreadNodes.java index f9d970a3611e..80024deb2b5c 100644 --- a/src/main/java/org/truffleruby/core/thread/ThreadNodes.java +++ b/src/main/java/org/truffleruby/core/thread/ThreadNodes.java @@ -669,12 +669,13 @@ static Object call( @Cached TranslateInteropExceptionNode translateInteropExceptionNode, @Bind("this") Node node, @Cached("new(node, receivers, translateInteropExceptionNode)") BlockingCallInterruptible blockingCallInterruptible) { - final ThreadManager threadManager = getContext(node).getThreadManager(); + var context = getContext(node); + final ThreadManager threadManager = context.getThreadManager(); final Interrupter interrupter; if (unblocker == nil) { interrupter = threadManager.getNativeCallInterrupter(); } else { - interrupter = makeInterrupter(getContext(node), unblockWrapper, unblocker, unblockerArg); + interrupter = new CExtInterrupter(context, unblockWrapper, unblocker, unblockerArg); } final Object[] args = { function, arg }; @@ -687,12 +688,6 @@ static Object call( node); } - @TruffleBoundary - private static Interrupter makeInterrupter(RubyContext context, Object wrapper, Object function, - Object argument) { - return new CExtInterrupter(context, wrapper, function, argument); - } - private static final class CExtInterrupter implements Interrupter { private final RubyContext context; @@ -721,7 +716,6 @@ public void interrupt(Thread thread) { } prev = truffleContext.enter(null); } catch (IllegalStateException e) { // Multi threaded access denied from Truffle - // Not in a context, so we cannot use TruffleLogger context.getLogger().severe( "could not unblock thread inside blocking call in C extension because " + "the context does not allow multithreading (" + e.getMessage() + ")"); From 6bbd0de741928e4bb60c724cc9b3efca8d965bb4 Mon Sep 17 00:00:00 2001 From: Benoit Daloze Date: Tue, 9 Jan 2024 19:22:19 +0100 Subject: [PATCH 2/2] Call unblock functions using JNI in CExtInterrupter * We cannot enter TruffleContext on a system thread, so that leaves us with two options: * Calling the unblock function with JNI, and expect the unblock function does not call back to Ruby (not the case for all unblock functions I saw). * Calling the unblock function on a new Ruby Thread per context and communicate via a queue, but this seems very high overhead for every guest safepoint. * Fixes https://github.com/oracle/truffleruby/issues/3013 --- CHANGELOG.md | 1 + lib/cext/ABI_check.txt | 2 +- lib/truffle/truffle/cext.rb | 7 +- spec/tags/optional/capi/thread_tags.txt | 1 + spec/tags/truffle/capi/thread_tags.txt | 4 + spec/tags/truffle/capi/unimplemented_tags.txt | 1 + .../capi/ext/truffleruby_thread_spec.c | 85 +++++++++++++++++++ spec/truffle/capi/thread_spec.rb | 18 +++- spec/truffle/capi/unimplemented_spec.rb | 10 +-- spec/truffleruby.mspec | 4 + src/main/c/cext/call.c | 2 +- src/main/c/rubysignal/src/rubysignal.c | 9 ++ .../truffleruby/core/thread/ThreadNodes.java | 54 ++---------- .../org/truffleruby/signal/LibRubySignal.java | 2 + 14 files changed, 144 insertions(+), 56 deletions(-) create mode 100644 spec/tags/optional/capi/thread_tags.txt create mode 100644 spec/tags/truffle/capi/thread_tags.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 15a91ee762a6..33de55df4c89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Bug fixes: * Fix some C API functions which were failing when called with Ruby values represented as Java primitives (#3352, @eregon). * Fix `IO.select([io], nil, [io])` on macOS, it was hanging due to a bug in macOS `poll(2)` (#3346, @eregon, @andrykonchin). * Run context cleanup such as showing the output of tools when `SignalException` and `Interrupt` escape (@eregon). +* Fix using the `--cpusampler` profiler when there are custom unblock functions for `rb_thread_call_without_gvl()` (#3013, @eregon). Compatibility: diff --git a/lib/cext/ABI_check.txt b/lib/cext/ABI_check.txt index 7f8f011eb73d..45a4fb75db86 100644 --- a/lib/cext/ABI_check.txt +++ b/lib/cext/ABI_check.txt @@ -1 +1 @@ -7 +8 diff --git a/lib/truffle/truffle/cext.rb b/lib/truffle/truffle/cext.rb index 0e189e7db29a..c6c1559c2db9 100644 --- a/lib/truffle/truffle/cext.rb +++ b/lib/truffle/truffle/cext.rb @@ -1933,9 +1933,14 @@ def rb_thread_call_without_gvl(function, data1, unblock, data2) end private def rb_thread_call_without_gvl_inner(function, data1, unblock, data2) + if SULONG + Truffle::Interop.to_native(unblock) + Truffle::Interop.to_native(data2) + end + Primitive.call_with_unblocking_function(Thread.current, POINTER_TO_POINTER_WRAPPER, function, data1, - POINTER_TO_VOID_WRAPPER, unblock, data2) + Truffle::Interop.as_pointer(unblock), Truffle::Interop.as_pointer(data2)) end def rb_iterate(iteration, iterated_object, callback, callback_arg) diff --git a/spec/tags/optional/capi/thread_tags.txt b/spec/tags/optional/capi/thread_tags.txt new file mode 100644 index 000000000000..3112893eff01 --- /dev/null +++ b/spec/tags/optional/capi/thread_tags.txt @@ -0,0 +1 @@ +sulong(would need to enter the context to call the unblock function):C-API Thread function rb_thread_call_without_gvl runs a C function with the global lock unlocked and can be woken by a signal diff --git a/spec/tags/truffle/capi/thread_tags.txt b/spec/tags/truffle/capi/thread_tags.txt new file mode 100644 index 000000000000..153edafb4c83 --- /dev/null +++ b/spec/tags/truffle/capi/thread_tags.txt @@ -0,0 +1,4 @@ +slow:TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with RUBY_UBF_IO when using CPUSampler +slow:TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with a custom unblock function when using CPUSampler +sulong(safepoint inside C code instead):TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with RUBY_UBF_IO when using CPUSampler +sulong(would need to enter the context to call the unblock function):TruffleRuby C-API Thread function rb_thread_call_without_gvl is unblocked with a custom unblock function when using CPUSampler diff --git a/spec/tags/truffle/capi/unimplemented_tags.txt b/spec/tags/truffle/capi/unimplemented_tags.txt index 88e88ac1bd58..af97209e310d 100644 --- a/spec/tags/truffle/capi/unimplemented_tags.txt +++ b/spec/tags/truffle/capi/unimplemented_tags.txt @@ -1 +1,2 @@ slow:Unimplemented functions in the C-API abort the process and show an error including the function name +sulong(different behavior):Unimplemented functions in the C-API abort the process and show an error including the function name diff --git a/spec/truffle/capi/ext/truffleruby_thread_spec.c b/spec/truffle/capi/ext/truffleruby_thread_spec.c index 35fde3426ff9..c75d8940c337 100644 --- a/spec/truffle/capi/ext/truffleruby_thread_spec.c +++ b/spec/truffle/capi/ext/truffleruby_thread_spec.c @@ -11,6 +11,10 @@ #include "ruby/thread.h" #include "rubyspec.h" +#include +#include +#include + #ifdef __cplusplus extern "C" { #endif @@ -21,9 +25,90 @@ static VALUE thread_spec_rb_thread_call_without_gvl_native_function(VALUE self) return LONG2FIX(ret); } +static void* call_check_ints(void* arg) { + rb_thread_check_ints(); + return NULL; +} + +static void* block_sleep(void* arg) { + struct timespec remaining = { .tv_sec = 1, .tv_nsec = 0 }; + while (nanosleep(&remaining, &remaining) == -1 && errno == EINTR) { + // Similar to how ossl_pkey.c does it + rb_thread_call_with_gvl(call_check_ints, NULL); + } + return (void*) Qtrue; +} + +static VALUE thread_spec_rb_thread_call_without_gvl_unblock_signal(VALUE self) { + return (VALUE) rb_thread_call_without_gvl(block_sleep, NULL, RUBY_UBF_IO, NULL); +} + +static void* block(void* arg) { + int fd = *(int*)arg; + char buffer = ' '; + ssize_t r; + + while (true) { + ssize_t r = read(fd, &buffer, 1); + if (r == 1) { + if (buffer == 'D') { // done + return (void*) Qtrue; + } else if (buffer == 'U') { // unblock + // Similar to how ossl_pkey.c does it + rb_thread_call_with_gvl(call_check_ints, NULL); + continue; + } else { + return (void*) rb_str_new(&buffer, 1); + } + } else { + perror("read() in blocking function returned != 1"); + return (void*) Qfalse; + } + } +} + +static void unblock(void* arg) { + int fd = *(int*)arg; + char buffer = 'U'; + while (write(fd, &buffer, 1) == -1 && errno == EINTR) { + // retry + } +} + +static VALUE finish(void* arg) { + int fd = *(int*)arg; + + // Wait 1 second + struct timespec remaining = { .tv_sec = 1, .tv_nsec = 0 }; + while (nanosleep(&remaining, &remaining) == -1 && errno == EINTR) { + // Sleep the remaining amount + } + + char buffer = 'D'; + while (write(fd, &buffer, 1) == -1 && errno == EINTR) { + // retry + } + return Qtrue; +} + +static VALUE thread_spec_rb_thread_call_without_gvl_unblock_custom_function(VALUE self) { + int fds[2]; + if (pipe(fds) == -1) { + rb_raise(rb_eRuntimeError, "could not create pipe"); + } + + VALUE thread = rb_funcall(rb_block_proc(), rb_intern("call"), 1, INT2FIX(fds[1])); + + rb_thread_call_without_gvl(block, &fds[0], unblock, &fds[1]); + + return rb_funcall(thread, rb_intern("join"), 0); +} + void Init_truffleruby_thread_spec(void) { VALUE cls = rb_define_class("CApiTruffleRubyThreadSpecs", rb_cObject); rb_define_method(cls, "rb_thread_call_without_gvl_native_function", thread_spec_rb_thread_call_without_gvl_native_function, 0); + rb_define_method(cls, "rb_thread_call_without_gvl_unblock_signal", thread_spec_rb_thread_call_without_gvl_unblock_signal, 0); + rb_define_method(cls, "rb_thread_call_without_gvl_unblock_custom_function", thread_spec_rb_thread_call_without_gvl_unblock_custom_function, 0); } #ifdef __cplusplus diff --git a/spec/truffle/capi/thread_spec.rb b/spec/truffle/capi/thread_spec.rb index 85ec04b67b23..48dfa37b80ff 100644 --- a/spec/truffle/capi/thread_spec.rb +++ b/spec/truffle/capi/thread_spec.rb @@ -8,7 +8,7 @@ require_relative '../../ruby/optional/capi/spec_helper' -load_extension("truffleruby_thread") +extension_path = load_extension("truffleruby_thread") describe "TruffleRuby C-API Thread function" do before :each do @@ -19,5 +19,21 @@ it "runs a native function with the global lock unlocked" do @t.rb_thread_call_without_gvl_native_function.should == Process.pid end + + it "is unblocked with RUBY_UBF_IO when using CPUSampler" do + code = "require #{extension_path.dump}; CApiTruffleRubyThreadSpecs.new.rb_thread_call_without_gvl_unblock_signal" + out = ruby_exe(code, options: '--cpusampler') + out.should.include?('rb_thread_call_without_gvl_unblock_signal') + out.should.include?('rb_thread_call_without_gvl') + out.should.include?('rb_thread_call_with_gvl') # which checks guest safepoints + end + + it "is unblocked with a custom unblock function when using CPUSampler" do + code = "require #{extension_path.dump}; CApiTruffleRubyThreadSpecs.new.rb_thread_call_without_gvl_unblock_custom_function { |fd| Thread.new { sleep 1; IO.for_fd(fd, autoclose: false).write 'D' } }" + out = ruby_exe(code, options: '--cpusampler') + out.should.include?('rb_thread_call_without_gvl_unblock_custom_function') + out.should.include?('rb_thread_call_without_gvl') + out.should.include?('rb_thread_call_with_gvl') # which checks guest safepoints + end end end diff --git a/spec/truffle/capi/unimplemented_spec.rb b/spec/truffle/capi/unimplemented_spec.rb index 033aa2faa4c6..a261d8573884 100644 --- a/spec/truffle/capi/unimplemented_spec.rb +++ b/spec/truffle/capi/unimplemented_spec.rb @@ -11,11 +11,9 @@ extension_path = load_extension("unimplemented") describe "Unimplemented functions in the C-API" do - guard_not -> { Truffle::Boot.get_option('cexts-sulong') } do - it "abort the process and show an error including the function name" do - expected_status = platform_is(:darwin) ? :SIGABRT : 127 - out = ruby_exe('require ARGV[0]; CApiRbTrErrorSpecs.new.not_implemented_function("foo")', args: "#{extension_path} 2>&1", exit_status: expected_status) - out.should =~ /undefined symbol: rb_str_shared_replace|Symbol not found: _rb_str_shared_replace/ - end + it "abort the process and show an error including the function name" do + expected_status = platform_is(:darwin) ? :SIGABRT : 127 + out = ruby_exe('require ARGV[0]; CApiRbTrErrorSpecs.new.not_implemented_function("foo")', args: "#{extension_path} 2>&1", exit_status: expected_status) + out.should =~ /undefined symbol: rb_str_shared_replace|Symbol not found: _rb_str_shared_replace/ end end diff --git a/spec/truffleruby.mspec b/spec/truffleruby.mspec index 05121943d252..abf6b54cdca9 100644 --- a/spec/truffleruby.mspec +++ b/spec/truffleruby.mspec @@ -129,6 +129,10 @@ class MSpecScript else excludes << 'jvm' end + + if Truffle::Boot.get_option('cexts-sulong') + excludes << 'sulong' + end end if windows? diff --git a/src/main/c/cext/call.c b/src/main/c/cext/call.c index 01c9dae594be..417419bc22ec 100644 --- a/src/main/c/cext/call.c +++ b/src/main/c/cext/call.c @@ -146,7 +146,7 @@ void *rb_thread_call_with_gvl(gvl_call *function, void *data1) { void* rb_thread_call_without_gvl(gvl_call *function, void *data1, rb_unblock_function_t *unblock_function, void *data2) { if (unblock_function == RUBY_UBF_IO) { - unblock_function = (rb_unblock_function_t*) rb_tr_unwrap(Qnil); + unblock_function = (rb_unblock_function_t*) NULL; } return polyglot_invoke(RUBY_CEXT, "rb_thread_call_without_gvl", function, data1, unblock_function, data2); diff --git a/src/main/c/rubysignal/src/rubysignal.c b/src/main/c/rubysignal/src/rubysignal.c index 3b35e79ce67b..3597e1a5066f 100644 --- a/src/main/c/rubysignal/src/rubysignal.c +++ b/src/main/c/rubysignal/src/rubysignal.c @@ -50,3 +50,12 @@ JNIEXPORT void JNICALL Java_org_truffleruby_signal_LibRubySignal_restoreSystemHa signal(signo, SIG_DFL); raise(signo); } + +// Declaration copied from lib/cext/include/ruby/internal/intern/thread.h +typedef void rb_unblock_function_t(void *); + +JNIEXPORT void JNICALL Java_org_truffleruby_signal_LibRubySignal_executeUnblockFunction(JNIEnv *env, jclass clazz, jlong function, jlong argument) { + rb_unblock_function_t* unblock_function = (rb_unblock_function_t*) function; + void* arg = (void*) argument; + unblock_function(arg); +} diff --git a/src/main/java/org/truffleruby/core/thread/ThreadNodes.java b/src/main/java/org/truffleruby/core/thread/ThreadNodes.java index 80024deb2b5c..ac5621fd0f7e 100644 --- a/src/main/java/org/truffleruby/core/thread/ThreadNodes.java +++ b/src/main/java/org/truffleruby/core/thread/ThreadNodes.java @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit; import com.oracle.truffle.api.RootCallTarget; -import com.oracle.truffle.api.TruffleContext; import com.oracle.truffle.api.TruffleSafepoint; import com.oracle.truffle.api.TruffleSafepoint.Interrupter; import com.oracle.truffle.api.dsl.Bind; @@ -109,13 +108,13 @@ import com.oracle.truffle.api.dsl.Cached; import com.oracle.truffle.api.dsl.ImportStatic; import com.oracle.truffle.api.dsl.Specialization; -import com.oracle.truffle.api.interop.InteropException; import com.oracle.truffle.api.interop.InteropLibrary; import com.oracle.truffle.api.library.CachedLibrary; import com.oracle.truffle.api.nodes.Node; import com.oracle.truffle.api.object.Shape; import com.oracle.truffle.api.profiles.BranchProfile; import com.oracle.truffle.api.source.SourceSection; +import org.truffleruby.signal.LibRubySignal; import static org.truffleruby.language.SafepointPredicate.ALL_THREADS_AND_FIBERS; @@ -658,13 +657,7 @@ public abstract static class CallWithUnblockingFunctionNode extends PrimitiveArr @SuppressWarnings("truffle-neverdefault") // GR-43642 @Specialization(limit = "getCacheLimit()") static Object call( - RubyThread thread, - Object wrapper, - Object function, - Object arg, - Object unblockWrapper, - Object unblocker, - Object unblockerArg, + RubyThread thread, Object wrapper, Object function, Object arg, long unblocker, long unblockerArg, @CachedLibrary("wrapper") InteropLibrary receivers, @Cached TranslateInteropExceptionNode translateInteropExceptionNode, @Bind("this") Node node, @@ -672,10 +665,10 @@ static Object call( var context = getContext(node); final ThreadManager threadManager = context.getThreadManager(); final Interrupter interrupter; - if (unblocker == nil) { + if (unblocker == 0) { interrupter = threadManager.getNativeCallInterrupter(); } else { - interrupter = new CExtInterrupter(context, unblockWrapper, unblocker, unblockerArg); + interrupter = new CExtInterrupter(unblocker, unblockerArg); } final Object[] args = { function, arg }; @@ -690,48 +683,17 @@ static Object call( private static final class CExtInterrupter implements Interrupter { - private final RubyContext context; - private final Object wrapper; - private final Object function; - private final Object argument; + private final long function; + private final long argument; - public CExtInterrupter(RubyContext context, Object wrapper, Object function, Object argument) { - assert InteropLibrary.getUncached().isExecutable(wrapper); - this.context = context; - this.wrapper = wrapper; + public CExtInterrupter(long function, long argument) { this.function = function; this.argument = argument; } @Override public void interrupt(Thread thread) { - final TruffleContext truffleContext = context.getEnv().getContext(); - final boolean alreadyEntered = truffleContext.isEntered(); - Object prev = null; - if (!alreadyEntered) { - // We need to enter the context to execute this unblocking action, as it runs on Sulong. - try { - if (context.getOptions().SINGLE_THREADED) { - throw new IllegalStateException("--single-threaded was passed"); - } - prev = truffleContext.enter(null); - } catch (IllegalStateException e) { // Multi threaded access denied from Truffle - context.getLogger().severe( - "could not unblock thread inside blocking call in C extension because " + - "the context does not allow multithreading (" + e.getMessage() + ")"); - return; - } - } - - try { - InteropLibrary.getUncached().execute(wrapper, function, argument); - } catch (InteropException e) { - throw CompilerDirectives.shouldNotReachHere(e); - } finally { - if (!alreadyEntered) { - truffleContext.leave(null, prev); - } - } + LibRubySignal.executeUnblockFunction(function, argument); } @Override diff --git a/src/signal/java/org/truffleruby/signal/LibRubySignal.java b/src/signal/java/org/truffleruby/signal/LibRubySignal.java index b042bab92037..494019f217a8 100644 --- a/src/signal/java/org/truffleruby/signal/LibRubySignal.java +++ b/src/signal/java/org/truffleruby/signal/LibRubySignal.java @@ -26,4 +26,6 @@ public static void loadLibrary(String rubyHome, String libSuffix) { public static native void restoreSystemHandlerAndRaise(int signalNumber); + public static native void executeUnblockFunction(long function, long argument); + }