Skip to content

Commit

Permalink
Prevent flows to run multiple times per execute
Browse files Browse the repository at this point in the history
Add ability to enter flow context and execute closure
  • Loading branch information
zakarumych committed Jul 12, 2024
1 parent 0bf4236 commit 22d3d80
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
14 changes: 7 additions & 7 deletions src/flow/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use core::{
use crate::{
bundle::{Bundle, DynamicBundle, DynamicComponentBundle},
component::Component,
entity::{EntityBound, EntityId, EntityLoc, EntityRef},
entity::{Entity, EntityBound, EntityId, EntityLoc, EntityRef, EntitySet, Location},
query::{DefaultQuery, IntoQuery, Query, QueryItem},
world::WorldLocal,
world::{World, WorldLocal},
EntityError, NoSuchEntity,
};

Expand All @@ -28,29 +28,29 @@ pub struct FlowEntity {
marker: PhantomData<fn() -> &'static mut WorldLocal>,
}

impl crate::entity::Entity for FlowEntity {
impl Entity for FlowEntity {
#[inline(always)]
fn id(&self) -> EntityId {
self.id
}

#[inline(always)]
fn lookup(&self, entities: &crate::entity::EntitySet) -> Option<crate::entity::Location> {
fn lookup(&self, entities: &EntitySet) -> Option<Location> {
self.id.lookup(entities)
}

#[inline(always)]
fn is_alive(&self, entities: &crate::entity::EntitySet) -> bool {
fn is_alive(&self, entities: &EntitySet) -> bool {
self.id.is_alive(entities)
}

#[inline(always)]
fn entity_loc<'a>(&self, entities: &'a crate::entity::EntitySet) -> Option<EntityLoc<'a>> {
fn entity_loc<'a>(&self, entities: &'a EntitySet) -> Option<EntityLoc<'a>> {
self.id.entity_loc(entities)
}

#[inline(always)]
fn entity_ref<'a>(&self, world: &'a mut crate::world::World) -> Option<EntityRef<'a>> {
fn entity_ref<'a>(&self, world: &'a mut World) -> Option<EntityRef<'a>> {
self.id.entity_ref(world)
}
}
Expand Down
63 changes: 47 additions & 16 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@
use core::{
any::TypeId,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll, Waker},
};

use alloc::{boxed::Box, sync::Arc, task::Wake, vec::Vec};
use alloc::{
boxed::Box,
sync::{Arc, Weak},
task::Wake,
vec::Vec,
};

use amity::{flip_queue::FlipQueue, ring_buffer::RingBuffer};
use hashbrown::HashMap;
Expand All @@ -39,7 +45,7 @@ use crate::{
entity::{EntityId, EntityRef},
system::State,
type_id,
world::WorldLocal,
world::{World, WorldLocal},
};

mod entity;
Expand Down Expand Up @@ -137,11 +143,14 @@ where
if let Some(flow) = make_flow.make_flow() {
let task_id = typed_flows.array.vacant_key();

let needs_wake = Arc::new(AtomicBool::new(false));
let task = FlowTask {
flow: Box::pin(flow),
needs_wake: needs_wake.clone(),
waker: Waker::from(Arc::new(FlowWaker {
task_id,
queue: queue.queue.clone(),
flip: Arc::downgrade(&queue.flip),
needs_wake,
})),
};

Expand Down Expand Up @@ -204,21 +213,31 @@ impl dyn AnyFlows {

struct FlowWaker {
task_id: usize,
queue: Arc<FlipQueue<usize>>,
needs_wake: Arc<AtomicBool>,
flip: Weak<FlipQueue<usize>>,
}

impl Wake for FlowWaker {
#[inline(always)]
fn wake(self: Arc<Self>) {
self.queue.push(self.task_id);
self.wake_by_ref();
}

#[inline(always)]
fn wake_by_ref(self: &Arc<Self>) {
self.queue.push(self.task_id);
if self.needs_wake.fetch_and(false, Ordering::Acquire) {
return;
}
let Some(flip) = self.flip.upgrade() else {
return;
};
flip.push(self.task_id);
}
}

struct FlowTask<F> {
flow: Pin<Box<F>>,
needs_wake: Arc<AtomicBool>,
waker: Waker,
}

Expand All @@ -238,6 +257,8 @@ where
continue;
};

task.needs_wake.store(true, Ordering::Relaxed);

let mut cx = Context::from_waker(&task.waker);

// Safety: This is the only code that can access `task.flow`.
Expand Down Expand Up @@ -276,7 +297,7 @@ where

/// Queue of flows of a single type.
struct AnyQueue {
queue: Arc<FlipQueue<usize>>,
flip: Arc<FlipQueue<usize>>,
ready: RingBuffer<usize>,
flows: Box<dyn AnyFlows>,
}
Expand All @@ -287,7 +308,7 @@ impl AnyQueue {
F: Flow + 'static,
{
AnyQueue {
queue: Arc::new(FlipQueue::new()),
flip: Arc::new(FlipQueue::new()),
ready: RingBuffer::new(),
flows: Box::new(TypedFlows::<F> { array: Slab::new() }),
}
Expand Down Expand Up @@ -319,10 +340,7 @@ impl Flows {
}
}

fn collect_new_flows<'a>(
&mut self,
world: &'a mut crate::world::World,
) -> Option<tls::WorldGuard<'a>> {
fn collect_new_flows<'a>(&mut self, world: &'a mut World) -> Option<tls::WorldGuard<'a>> {
let world = world.local();

core::mem::swap(&mut self.new_flows, world.new_flows.get_mut());
Expand All @@ -332,7 +350,7 @@ impl Flows {
// First swap all queues with ready buffer.
for typed in self.map.values_mut() {
debug_assert!(typed.ready.is_empty());
typed.queue.swap_buffer(&mut typed.ready);
typed.flip.swap_buffer(&mut typed.ready);
}

// Then drain all new flows into queues.
Expand All @@ -348,7 +366,7 @@ impl Flows {
///
/// Flows spawned in the world are drained into this instance,
/// so this function should be called with the same world instance.
pub fn execute(&mut self, world: &mut crate::world::World) {
pub fn execute(&mut self, world: &mut World) {
world.maintenance();

let Some(_guard) = self.collect_new_flows(world) else {
Expand All @@ -366,11 +384,24 @@ impl Flows {
typed.ready.clear();
}
}

/// Enter flow context and execute the closure with the [`FlowWorld`] instance.
///
/// Closure may use any [`FlowWorld`] and [`FlowEntity`] values.
pub fn enter<F, R>(world: &mut World, f: F) -> R
where
F: FnOnce(FlowWorld) -> R,
{
let guard = tls::WorldGuard::new(world.local());
let r = f(FlowWorld::new());
drop(guard);
r
}
}

/// Function that can be used as a [`System`](crate::system::System)
/// to execute flows in the ECS world.
pub fn flows_system(world: &mut crate::world::World, mut flows: State<Flows>) {
pub fn flows_system(world: &mut World, mut flows: State<Flows>) {
let flows = &mut *flows;
flows.execute(world);
}
Expand Down Expand Up @@ -412,7 +443,7 @@ where
}
}

impl crate::world::World {
impl World {
/// Spawns a flow in the world.
/// It will be polled during [`Flows::execute`] until completion.
pub fn spawn_flow<F>(&mut self, flow: F)
Expand Down
11 changes: 5 additions & 6 deletions src/query/option.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use core::any::TypeId;

use crate::{archetype::Archetype, component::ComponentInfo, epoch::EpochId, system::QueryArg};
use crate::{
archetype::Archetype, component::ComponentInfo, entity::EntityId, epoch::EpochId,
system::QueryArg,
};

use super::{
Access, AsQuery, BatchFetch, DefaultQuery, Fetch, ImmutableQuery, IntoQuery, Query, SendQuery,
Expand Down Expand Up @@ -170,11 +173,7 @@ where
}
}

fn reserved_entity_item<'a>(
&self,
id: crate::entity::EntityId,
idx: u32,
) -> Option<Option<T::Item<'a>>> {
fn reserved_entity_item<'a>(&self, id: EntityId, idx: u32) -> Option<Option<T::Item<'a>>> {
Some(self.0.reserved_entity_item(id, idx))
}
}
Expand Down

0 comments on commit 22d3d80

Please sign in to comment.