From 637bb5f9928b6b782306b0b1dea2a7120d6cb26c Mon Sep 17 00:00:00 2001 From: David Alsh Date: Fri, 21 Jun 2024 19:25:05 +1000 Subject: [PATCH] Added executor for Rust futures driven by Libuv --- .gitignore | 1 + Cargo.lock | 161 +++++++++++++++++++++- crates/neon/Cargo.toml | 7 +- crates/neon/src/asynch/libuv.rs | 26 ++++ crates/neon/src/asynch/mod.rs | 6 + crates/neon/src/asynch/root.rs | 76 ++++++++++ crates/neon/src/asynch/runtime.rs | 64 +++++++++ crates/neon/src/context/mod.rs | 109 +++++++++++++++ crates/neon/src/lib.rs | 2 + crates/neon/src/sys/bindings/functions.rs | 5 + crates/neon/src/sys/bindings/libuv.rs | 0 crates/neon/src/sys/bindings/mod.rs | 3 +- crates/neon/src/sys/bindings/types.rs | 13 ++ 13 files changed, 467 insertions(+), 6 deletions(-) create mode 100644 crates/neon/src/asynch/libuv.rs create mode 100644 crates/neon/src/asynch/mod.rs create mode 100644 crates/neon/src/asynch/root.rs create mode 100644 crates/neon/src/asynch/runtime.rs create mode 100644 crates/neon/src/sys/bindings/libuv.rs diff --git a/.gitignore b/.gitignore index 740828832..d012485bd 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ pkgs/create-neon/create-neon-manual-test-project test/cli/lib npm-debug.log rls*.log +.vscode diff --git a/Cargo.lock b/Cargo.lock index 065d7b326..62d51fad1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "alsh_libuv" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bc181a3a77b87d8e49d24e02fe13d3371906bfcd13820db8cbb400e48d51e8b" +dependencies = [ + "bitflags 1.2.1", + "libuv-sys2", +] + [[package]] name = "anyhow" version = "1.0.75" @@ -73,7 +83,7 @@ version = "0.65.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" dependencies = [ - "bitflags 1.3.2", + "bitflags 1.2.1", "cexpr", "clang-sys", "lazy_static", @@ -90,11 +100,31 @@ dependencies = [ "which", ] +[[package]] +name = "bindgen" +version = "0.68.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726e4313eb6ec35d2730258ad4e15b547ee75d6afaa1361a922e78e59b7d8078" +dependencies = [ + "bitflags 2.4.1", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.57", +] + [[package]] name = "bitflags" -version = "1.3.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "bitflags" @@ -178,6 +208,95 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.57", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getrandom" version = "0.2.11" @@ -297,6 +416,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libuv-sys2" +version = "1.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6125e1a220a5698a154ce76762d2ef8884baf9f77da7ceb8a3bd8c5ce27df343" +dependencies = [ + "bindgen 0.68.1", + "cc", + "pkg-config", +] + [[package]] name = "linkify" version = "0.10.0" @@ -373,10 +503,12 @@ dependencies = [ name = "neon" version = "1.0.0" dependencies = [ + "alsh_libuv", "anyhow", "aquamarine", "doc-comment", "easy-cast", + "futures", "getrandom", "libloading 0.8.1", "linkify", @@ -409,7 +541,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d252ccdd7b72dd5e92ab65471d6a26ba47375139a6071ead3dbf191f60de9903" dependencies = [ - "bindgen", + "bindgen 0.65.1", ] [[package]] @@ -506,6 +638,18 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -712,6 +856,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.2" diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index d15b86396..0dd570e5c 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -35,6 +35,9 @@ doc-comment = { version = "0.3.3", optional = true } send_wrapper = "0.6.0" serde = { version = "1.0.197", optional = true } serde_json = { version = "1.0.114", optional = true } +futures = { version = "0.3", optional = true } +# Fork of libuv bindings for Rust +libuv = { package = "alsh_libuv", version = "2", features = ["sys"], optional = true } [dependencies.tokio] version = "1.34.0" @@ -43,7 +46,7 @@ features = ["sync"] optional = true [features] -default = ["napi-8"] +default = ["napi-8", "futures"] # Enable extracting values by serializing to JSON serde = ["dep:serde", "dep:serde_json"] @@ -57,6 +60,8 @@ external-buffers = [] # https://github.com/neon-bindings/rfcs/pull/46 futures = ["tokio"] +asynch = ["dep:futures", "dep:libuv"] + # Enable low-level system APIs. The `sys` API allows augmenting the Neon API # from external crates. sys = [] diff --git a/crates/neon/src/asynch/libuv.rs b/crates/neon/src/asynch/libuv.rs new file mode 100644 index 000000000..9245f8683 --- /dev/null +++ b/crates/neon/src/asynch/libuv.rs @@ -0,0 +1,26 @@ +use std::mem::MaybeUninit; +use std::rc::Rc; + +use crate::context::internal::Env; +use crate::sys::bindings::get_uv_event_loop; +use libuv::sys::uv_loop_t; +use libuv::Loop; +use once_cell::unsync::OnceCell; + +thread_local! { + pub static LIB_UV: OnceCell> = OnceCell::new(); +} + +/// Gets a reference to Libuv +pub fn get_lib_uv<'a>(env: &Env) -> Rc { + LIB_UV.with(move |cell| { + cell.get_or_init(move || { + let mut result = MaybeUninit::uninit(); + unsafe { get_uv_event_loop(env.to_raw(), result.as_mut_ptr()) }; + let ptr = unsafe { *result.as_mut_ptr() }; + let ptr = ptr as *mut uv_loop_t; + Rc::new(unsafe { libuv::r#loop::Loop::from_external(ptr) }) + }) + .clone() + }) +} diff --git a/crates/neon/src/asynch/mod.rs b/crates/neon/src/asynch/mod.rs new file mode 100644 index 000000000..1b53d2052 --- /dev/null +++ b/crates/neon/src/asynch/mod.rs @@ -0,0 +1,6 @@ +//! This module extends Libuv to work as an executor for Rust futures +pub mod root; +mod libuv; +mod runtime; + +pub use runtime::*; diff --git a/crates/neon/src/asynch/root.rs b/crates/neon/src/asynch/root.rs new file mode 100644 index 000000000..3941e429e --- /dev/null +++ b/crates/neon/src/asynch/root.rs @@ -0,0 +1,76 @@ +/* + This is a basic method of persisting JavaScript values so + they are given a static lifetime and not collected by GC + + This is needed because not all JsValues can be called with .root() + and is probably temporary +*/ +use std::cell::RefCell; + +use crate::context::Context; +use crate::handle::Handle; +use crate::object::Object; +use crate::types::JsObject; +use crate::types::Value; +use once_cell::unsync::Lazy; + +thread_local! { + pub static THREAD_LOCAL_COUNT: Lazy> = Lazy::new(|| RefCell::new(0)); + pub static GLOBAL_KEY: Lazy = Lazy::new(|| { + let mut lower = [0; std::mem::size_of::()]; + getrandom::getrandom(&mut lower).expect("Unable to generate number"); + let lower = u16::from_ne_bytes(lower); + format!("__neon_root_cache_{}", lower) + }); +} + +fn ref_count_inc() -> usize { + THREAD_LOCAL_COUNT.with(|c| { + let mut c = c.borrow_mut(); + let current = (*c).clone(); + *c += 1; + current + }) +} + +pub fn root<'a>(cx: &mut impl Context<'a>) -> Handle<'a, JsObject> { + let global = cx.global_object(); + let key = GLOBAL_KEY.with(|k| cx.string(&*k.as_str())); + match global.get_opt(cx, key).unwrap() { + Some(obj) => obj, + None => { + let init = cx.empty_object(); + global.set(cx, key, init).unwrap(); + global.get_opt(cx, key).unwrap().unwrap() + } + } +} + +#[derive(Clone, Debug)] +pub struct RootGlobal { + inner: String, +} + +impl RootGlobal { + pub fn new<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> Self { + let index = ref_count_inc(); + let key_str = format!("{}", index); + let key = cx.string(&key_str); + let cache = root(cx); + cache.set(cx, key, value).unwrap(); + Self { inner: key_str } + } + + pub fn into_inner<'a, V: Value>(&self, cx: &mut impl Context<'a>) -> Handle<'a, V> { + let key = cx.string(&self.inner); + let cache = root(cx); + cache.get(cx, key).unwrap() + } + + pub fn remove<'a>(&self, cx: &mut impl Context<'a>) -> bool { + let key = cx.string(&self.inner); + let val = cx.undefined(); + let cache = root(cx); + cache.set(cx, key, val).unwrap() + } +} diff --git a/crates/neon/src/asynch/runtime.rs b/crates/neon/src/asynch/runtime.rs new file mode 100644 index 000000000..d6a6c477c --- /dev/null +++ b/crates/neon/src/asynch/runtime.rs @@ -0,0 +1,64 @@ +use std::cell::RefCell; +use std::future::Future; + +use crate::context::internal::Env; +use futures::task::LocalSpawnExt; +use futures::executor::LocalSpawner; +use futures::executor::LocalPool; +use once_cell::unsync::Lazy; + +use super::libuv::get_lib_uv; + +thread_local! { + static LOCAL_POOL: Lazy> = Lazy::new(|| RefCell::new(LocalPool::new())); + static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner()) ); + static TASK_COUNT: Lazy> = Lazy::new(|| Default::default() ); +} + +pub fn spawn_async_local(env: &Env, future: impl Future + 'static) { + SPAWNER.with(|ls| { + ls.spawn_local(async { + future.await; + task_count_dec(); + }) + .unwrap(); + }); + + // Delegate non-blocking polling of futures to libuv + if task_count_inc() != 0 { + return; + } + + // Idle handle refers to a libuv task that runs while "idling". + // This is not an idle state, rather an analogy to a car engine + let uv = get_lib_uv(env); + let mut task = uv.idle().unwrap(); + + // The idle task will conduct a non-blocking poll of all local futures + // and continue on pending futures allowing the poll to be non-blocking. + // This repeats until no more futures are pending in the local set. + task.start(move |mut task: libuv::IdleHandle| { + if task_count() != 0 { + LOCAL_POOL.with(|lp| lp.borrow_mut().run_until_stalled()); + } else { + task.stop().unwrap(); + } + }) + .unwrap(); +} + +fn task_count() -> usize { + TASK_COUNT.with(|c| *c.borrow_mut()) +} + +fn task_count_inc() -> usize { + let current = task_count(); + TASK_COUNT.with(|c| *c.borrow_mut() += 1); + current +} + +fn task_count_dec() -> usize { + let current = task_count(); + TASK_COUNT.with(|c| *c.borrow_mut() -= 1); + current +} diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index 0d936ed3f..8e08be635 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -145,6 +145,11 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe}; pub use crate::types::buffer::lock::Lock; +#[cfg(feature = "asynch")] +use crate::asynch::{root::RootGlobal, spawn_async_local}; +#[cfg(feature = "asynch")] +use futures::Future; + use crate::{ event::TaskBuilder, handle::Handle, @@ -290,6 +295,24 @@ pub trait Context<'a>: ContextInternal<'a> { result } + #[cfg(feature = "asynch")] + fn execute_async(&mut self, f: F) + where + Fut: Future, + F: FnOnce(AsyncContext) -> Fut + 'static, + { + use futures::Future; + + let env = self.env(); + + spawn_async_local(&env, async move { + let scope = unsafe { HandleScope::new(env.to_raw()) }; + let future = f(AsyncContext { env }); + future.await; + drop(scope); + }); + } + /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. /// /// Handles created in the new scope are kept alive only for the duration of the computation and cannot escape, with the exception of the result value, which is rooted in the outer context. @@ -593,6 +616,39 @@ impl<'a> ModuleContext<'a> { Ok(()) } + #[cfg(feature = "asynch")] + pub fn export_function_async<'b, F, V, Fut>(&mut self, key: &str, f: F) -> NeonResult<()> + where + Fut: Future>, + F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy, + V: Value, + { + let wrapper = JsFunction::new(self, move |mut cx| { + let mut args = vec![]; + + while let Some(arg) = cx.argument_opt(args.len()) { + let arg = arg.as_value(&mut cx); + let arg = RootGlobal::new(&mut cx, arg); + args.push(arg); + } + + let (deferred, promise) = cx.promise(); + cx.execute_async(move |mut cx| async move { + let acx = AsyncFunctionContext { + env: cx.env(), + arguments: args, + }; + deferred.resolve(&mut cx, f(acx).await.unwrap()); + () + }); + + Ok(promise) + })?; + + self.exports.clone().set(self, key, wrapper)?; + Ok(()) + } + /// Exports a JavaScript value from a Neon module. pub fn export_value(&mut self, key: &str, val: Handle) -> NeonResult<()> { self.exports.clone().set(self, key, val)?; @@ -627,6 +683,22 @@ impl<'a> ContextInternal<'a> for ExecuteContext<'a> { impl<'a> Context<'a> for ExecuteContext<'a> {} +/// An execution context of a scope created by [`Context::execute_async()`](Context::execute_async). +#[cfg(feature = "asynch")] +pub struct AsyncContext { + env: Env, +} + +#[cfg(feature = "asynch")] +impl<'a> ContextInternal<'static> for AsyncContext { + fn env(&self) -> Env { + self.env + } +} + +#[cfg(feature = "asynch")] +impl Context<'static> for AsyncContext {} + /// An execution context of a scope created by [`Context::compute_scoped()`](Context::compute_scoped). pub struct ComputeContext<'a> { env: Env, @@ -773,6 +845,43 @@ impl<'a> ContextInternal<'a> for FunctionContext<'a> { impl<'a> Context<'a> for FunctionContext<'a> {} +/// An execution context of an async function call. +/// +/// The type parameter `T` is the type of the `this`-binding. +#[cfg(feature = "asynch")] +pub struct AsyncFunctionContext { + env: Env, + arguments: Vec, +} + +#[cfg(feature = "asynch")] +impl<'a> AsyncFunctionContext { + pub fn argument(&mut self, i: usize) -> JsResult<'a, V> { + let arg = self.arguments.get(i).unwrap().clone(); + let handle = arg.into_inner(self); + Ok(handle) + } +} + +#[cfg(feature = "asynch")] +impl<'a> ContextInternal<'a> for AsyncFunctionContext { + fn env(&self) -> Env { + self.env + } +} + +#[cfg(feature = "asynch")] +impl<'a> Context<'a> for AsyncFunctionContext {} + +#[cfg(feature = "asynch")] +impl Drop for AsyncFunctionContext { + fn drop(&mut self) { + while let Some(arg) = self.arguments.pop() { + arg.remove(self); + } + } +} + /// An execution context of a task completion callback. pub struct TaskContext<'a> { env: Env, diff --git a/crates/neon/src/lib.rs b/crates/neon/src/lib.rs index a471b59b5..fb6220219 100644 --- a/crates/neon/src/lib.rs +++ b/crates/neon/src/lib.rs @@ -78,6 +78,8 @@ //! [supported]: https://github.com/neon-bindings/neon#platform-support #![cfg_attr(docsrs, feature(doc_cfg))] +#[cfg(feature = "asynch")] +mod asynch; pub mod context; pub mod event; pub mod handle; diff --git a/crates/neon/src/sys/bindings/functions.rs b/crates/neon/src/sys/bindings/functions.rs index 55bc5038b..c89be36ac 100644 --- a/crates/neon/src/sys/bindings/functions.rs +++ b/crates/neon/src/sys/bindings/functions.rs @@ -266,6 +266,11 @@ mod napi4 { generate!( #[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] extern "C" { + fn get_uv_event_loop( + env: Env, + uv_loop: *mut UvEventLoop, + ); + fn create_threadsafe_function( env: Env, func: Value, diff --git a/crates/neon/src/sys/bindings/libuv.rs b/crates/neon/src/sys/bindings/libuv.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/neon/src/sys/bindings/mod.rs b/crates/neon/src/sys/bindings/mod.rs index cdd8559e1..3b806fe4e 100644 --- a/crates/neon/src/sys/bindings/mod.rs +++ b/crates/neon/src/sys/bindings/mod.rs @@ -42,7 +42,7 @@ macro_rules! napi_name { /// ```ignore /// extern "C" { /// fn get_undefined(env: Env, result: *mut Value) -> Status; -/// /* Additional functions may be included */ +/// /* Additional functions may be included */ /// } /// ``` /// @@ -177,3 +177,4 @@ pub use self::{functions::*, types::*}; mod functions; mod types; +mod libuv; diff --git a/crates/neon/src/sys/bindings/types.rs b/crates/neon/src/sys/bindings/types.rs index a167c9282..d3b2aead0 100644 --- a/crates/neon/src/sys/bindings/types.rs +++ b/crates/neon/src/sys/bindings/types.rs @@ -66,6 +66,19 @@ pub struct Ref__ { /// [`napi_ref`](https://nodejs.org/api/n-api.html#napi_ref) pub type Ref = *mut Ref__; +#[cfg(feature = "napi-4")] +#[repr(C)] +#[derive(Debug, Copy, Clone)] +#[doc(hidden)] +pub struct UvEventLoop__ { + _unused: [u8; 0], +} + +#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] +#[cfg(feature = "napi-4")] +/// [`napi_threadsafe_function`](https://nodejs.org/api/n-api.html#napi_threadsafe_function) +pub type UvEventLoop = *mut UvEventLoop__; + #[cfg(feature = "napi-4")] #[repr(C)] #[derive(Debug, Copy, Clone)]