Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - tick local executor #6121

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/bevy_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" }

# other
bytemuck = "1.5"

[dev-dependencies]
crossbeam-channel = "0.5.0"
51 changes: 51 additions & 0 deletions crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use bevy_utils::{Duration, HashSet, Instant};
use std::borrow::Cow;
use std::ops::Range;

#[cfg(not(target_arch = "wasm32"))]
use bevy_ecs::schedule::IntoSystemDescriptor;
#[cfg(not(target_arch = "wasm32"))]
use bevy_tasks::tick_global_task_pools_on_main_thread;

/// Adds core functionality to Apps.
#[derive(Default)]
pub struct CorePlugin;
Expand All @@ -35,6 +40,13 @@ impl Plugin for CorePlugin {
.unwrap_or_default()
.create_default_pools();

#[cfg(not(target_arch = "wasm32"))]
app.add_system_to_stage(
bevy_app::CoreStage::Last,
tick_global_task_pools_on_main_thread.at_end(),
);

app.register_type::<Entity>().register_type::<Name>();
app.register_type::<Entity>()
.register_type::<Name>()
.register_type::<Range<f32>>()
Expand Down Expand Up @@ -97,3 +109,42 @@ fn register_math_types(app: &mut App) {
/// Wraps to 0 when it reaches the maximum u32 value
#[derive(Default, Resource, Clone, Copy)]
pub struct FrameCount(pub u32);

#[cfg(test)]
mod tests {
use super::*;
use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

#[test]
fn runs_spawn_local_tasks() {
let mut app = App::new();
app.add_plugin(CorePlugin);

let (async_tx, async_rx) = crossbeam_channel::unbounded();
AsyncComputeTaskPool::get()
.spawn_local(async move {
async_tx.send(()).unwrap();
})
.detach();

let (compute_tx, compute_rx) = crossbeam_channel::unbounded();
ComputeTaskPool::get()
.spawn_local(async move {
compute_tx.send(()).unwrap();
})
.detach();

let (io_tx, io_rx) = crossbeam_channel::unbounded();
IoTaskPool::get()
.spawn_local(async move {
io_tx.send(()).unwrap();
})
.detach();

app.run();

async_rx.try_recv().unwrap();
compute_rx.try_recv().unwrap();
io_rx.try_recv().unwrap();
}
}
2 changes: 2 additions & 0 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod single_threaded_task_pool;
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};

mod usages;
#[cfg(not(target_arch = "wasm32"))]
pub use usages::tick_global_task_pools_on_main_thread;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

mod iter;
Expand Down
33 changes: 29 additions & 4 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use concurrent_queue::ConcurrentQueue;
use futures_lite::{future, pin};
use futures_lite::{future, pin, FutureExt};

use crate::Task;

Expand Down Expand Up @@ -117,9 +117,16 @@ impl TaskPool {

thread_builder
.spawn(move || {
let shutdown_future = ex.run(shutdown_rx.recv());
// Use unwrap_err because we expect a Closed error
future::block_on(shutdown_future).unwrap_err();
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
let tick_forever = async move {
loop {
local_executor.tick().await;
}
};
let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv()));
// Use unwrap_err because we expect a Closed error
future::block_on(shutdown_future).unwrap_err();
});
})
.expect("Failed to spawn thread.")
})
Expand Down Expand Up @@ -306,6 +313,24 @@ impl TaskPool {
{
Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
}

/// Runs a function with the local executor. Typically used to tick
/// the local executor on the main thread as it needs to share time with
/// other things.
///
/// ```rust
/// use bevy_tasks::TaskPool;
///
/// TaskPool::new().with_local_executor(|local_executor| {
/// local_executor.try_tick();
/// });
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
{
Self::LOCAL_EXECUTOR.with(f)
}
}

impl Default for TaskPool {
Expand Down
26 changes: 26 additions & 0 deletions crates/bevy_tasks/src/usages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,29 @@ impl Deref for IoTaskPool {
&self.0
}
}

/// Used by `bevy_core` to tick the global tasks pools on the main thread.
/// This will run a maximum of 100 local tasks per executor per call to this function.
#[cfg(not(target_arch = "wasm32"))]
pub fn tick_global_task_pools_on_main_thread() {
COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|compute_local_executor| {
ASYNC_COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|async_local_executor| {
IO_TASK_POOL
.get()
.unwrap()
.with_local_executor(|io_local_executor| {
for _ in 0..100 {
compute_local_executor.try_tick();
async_local_executor.try_tick();
io_local_executor.try_tick();
}
});
});
});
}