From 22d3d807e1bbd0ea357f0b05342d59b0b9370013 Mon Sep 17 00:00:00 2001 From: Zakarum Date: Fri, 12 Jul 2024 16:29:00 +0200 Subject: [PATCH] Prevent flows to run multiple times per execute Add ability to enter flow context and execute closure --- src/flow/entity.rs | 14 +++++----- src/flow/mod.rs | 63 +++++++++++++++++++++++++++++++++------------ src/query/option.rs | 11 ++++---- 3 files changed, 59 insertions(+), 29 deletions(-) diff --git a/src/flow/entity.rs b/src/flow/entity.rs index 68a7db1..ee43009 100644 --- a/src/flow/entity.rs +++ b/src/flow/entity.rs @@ -9,9 +9,9 @@ use core::{ use crate::{ bundle::{Bundle, DynamicBundle, DynamicComponentBundle}, component::Component, - entity::{EntityBound, EntityId, EntityLoc, EntityRef}, + entity::{Entity, EntityBound, EntityId, EntityLoc, EntityRef, EntitySet, Location}, query::{DefaultQuery, IntoQuery, Query, QueryItem}, - world::WorldLocal, + world::{World, WorldLocal}, EntityError, NoSuchEntity, }; @@ -28,29 +28,29 @@ pub struct FlowEntity { marker: PhantomData &'static mut WorldLocal>, } -impl crate::entity::Entity for FlowEntity { +impl Entity for FlowEntity { #[inline(always)] fn id(&self) -> EntityId { self.id } #[inline(always)] - fn lookup(&self, entities: &crate::entity::EntitySet) -> Option { + fn lookup(&self, entities: &EntitySet) -> Option { self.id.lookup(entities) } #[inline(always)] - fn is_alive(&self, entities: &crate::entity::EntitySet) -> bool { + fn is_alive(&self, entities: &EntitySet) -> bool { self.id.is_alive(entities) } #[inline(always)] - fn entity_loc<'a>(&self, entities: &'a crate::entity::EntitySet) -> Option> { + fn entity_loc<'a>(&self, entities: &'a EntitySet) -> Option> { self.id.entity_loc(entities) } #[inline(always)] - fn entity_ref<'a>(&self, world: &'a mut crate::world::World) -> Option> { + fn entity_ref<'a>(&self, world: &'a mut World) -> Option> { self.id.entity_ref(world) } } diff --git a/src/flow/mod.rs b/src/flow/mod.rs index 201faaa..7815a12 100644 --- a/src/flow/mod.rs +++ b/src/flow/mod.rs @@ -26,10 +26,16 @@ use core::{ any::TypeId, pin::Pin, + sync::atomic::{AtomicBool, Ordering}, task::{Context, Poll, Waker}, }; -use alloc::{boxed::Box, sync::Arc, task::Wake, vec::Vec}; +use alloc::{ + boxed::Box, + sync::{Arc, Weak}, + task::Wake, + vec::Vec, +}; use amity::{flip_queue::FlipQueue, ring_buffer::RingBuffer}; use hashbrown::HashMap; @@ -39,7 +45,7 @@ use crate::{ entity::{EntityId, EntityRef}, system::State, type_id, - world::WorldLocal, + world::{World, WorldLocal}, }; mod entity; @@ -137,11 +143,14 @@ where if let Some(flow) = make_flow.make_flow() { let task_id = typed_flows.array.vacant_key(); + let needs_wake = Arc::new(AtomicBool::new(false)); let task = FlowTask { flow: Box::pin(flow), + needs_wake: needs_wake.clone(), waker: Waker::from(Arc::new(FlowWaker { task_id, - queue: queue.queue.clone(), + flip: Arc::downgrade(&queue.flip), + needs_wake, })), }; @@ -204,21 +213,31 @@ impl dyn AnyFlows { struct FlowWaker { task_id: usize, - queue: Arc>, + needs_wake: Arc, + flip: Weak>, } impl Wake for FlowWaker { + #[inline(always)] fn wake(self: Arc) { - self.queue.push(self.task_id); + self.wake_by_ref(); } + #[inline(always)] fn wake_by_ref(self: &Arc) { - self.queue.push(self.task_id); + if self.needs_wake.fetch_and(false, Ordering::Acquire) { + return; + } + let Some(flip) = self.flip.upgrade() else { + return; + }; + flip.push(self.task_id); } } struct FlowTask { flow: Pin>, + needs_wake: Arc, waker: Waker, } @@ -238,6 +257,8 @@ where continue; }; + task.needs_wake.store(true, Ordering::Relaxed); + let mut cx = Context::from_waker(&task.waker); // Safety: This is the only code that can access `task.flow`. @@ -276,7 +297,7 @@ where /// Queue of flows of a single type. struct AnyQueue { - queue: Arc>, + flip: Arc>, ready: RingBuffer, flows: Box, } @@ -287,7 +308,7 @@ impl AnyQueue { F: Flow + 'static, { AnyQueue { - queue: Arc::new(FlipQueue::new()), + flip: Arc::new(FlipQueue::new()), ready: RingBuffer::new(), flows: Box::new(TypedFlows:: { array: Slab::new() }), } @@ -319,10 +340,7 @@ impl Flows { } } - fn collect_new_flows<'a>( - &mut self, - world: &'a mut crate::world::World, - ) -> Option> { + fn collect_new_flows<'a>(&mut self, world: &'a mut World) -> Option> { let world = world.local(); core::mem::swap(&mut self.new_flows, world.new_flows.get_mut()); @@ -332,7 +350,7 @@ impl Flows { // First swap all queues with ready buffer. for typed in self.map.values_mut() { debug_assert!(typed.ready.is_empty()); - typed.queue.swap_buffer(&mut typed.ready); + typed.flip.swap_buffer(&mut typed.ready); } // Then drain all new flows into queues. @@ -348,7 +366,7 @@ impl Flows { /// /// Flows spawned in the world are drained into this instance, /// so this function should be called with the same world instance. - pub fn execute(&mut self, world: &mut crate::world::World) { + pub fn execute(&mut self, world: &mut World) { world.maintenance(); let Some(_guard) = self.collect_new_flows(world) else { @@ -366,11 +384,24 @@ impl Flows { typed.ready.clear(); } } + + /// Enter flow context and execute the closure with the [`FlowWorld`] instance. + /// + /// Closure may use any [`FlowWorld`] and [`FlowEntity`] values. + pub fn enter(world: &mut World, f: F) -> R + where + F: FnOnce(FlowWorld) -> R, + { + let guard = tls::WorldGuard::new(world.local()); + let r = f(FlowWorld::new()); + drop(guard); + r + } } /// Function that can be used as a [`System`](crate::system::System) /// to execute flows in the ECS world. -pub fn flows_system(world: &mut crate::world::World, mut flows: State) { +pub fn flows_system(world: &mut World, mut flows: State) { let flows = &mut *flows; flows.execute(world); } @@ -412,7 +443,7 @@ where } } -impl crate::world::World { +impl World { /// Spawns a flow in the world. /// It will be polled during [`Flows::execute`] until completion. pub fn spawn_flow(&mut self, flow: F) diff --git a/src/query/option.rs b/src/query/option.rs index d76fb6f..482c596 100644 --- a/src/query/option.rs +++ b/src/query/option.rs @@ -1,6 +1,9 @@ use core::any::TypeId; -use crate::{archetype::Archetype, component::ComponentInfo, epoch::EpochId, system::QueryArg}; +use crate::{ + archetype::Archetype, component::ComponentInfo, entity::EntityId, epoch::EpochId, + system::QueryArg, +}; use super::{ Access, AsQuery, BatchFetch, DefaultQuery, Fetch, ImmutableQuery, IntoQuery, Query, SendQuery, @@ -170,11 +173,7 @@ where } } - fn reserved_entity_item<'a>( - &self, - id: crate::entity::EntityId, - idx: u32, - ) -> Option>> { + fn reserved_entity_item<'a>(&self, id: EntityId, idx: u32) -> Option>> { Some(self.0.reserved_entity_item(id, idx)) } }