Skip to content

Commit

Permalink
Don't panic on race condition in client info updating (#173)
Browse files Browse the repository at this point in the history
Co-authored-by: Hennadii Chernyshchyk <genaloner@gmail.com>
  • Loading branch information
UkoeHB and Shatur authored Jan 13, 2024
1 parent 8ac746c commit b96fc72
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Make `NetworkChannels` channel-creation methods public (`create_client_channel()` and `create_server_channel()`).
- Implement `Eq` and `PartialEq` on `EventType`.

### Fixed

- Don't panic when handling client acks if the ack references a despawned entity.

### Removed

- `LastChangeTick` resource, `ClientsInfo` should be used instead.
Expand Down
15 changes: 6 additions & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub(super) mod clients_info;
pub mod clients_info;
pub(super) mod despawn_buffer;
pub(super) mod removal_buffer;
pub(super) mod replicated_archetypes_info;
Expand Down Expand Up @@ -336,9 +336,8 @@ fn collect_changes(
component,
)?;
} else {
let tick = *client_info
.ticks
.get(&entity.entity())
let tick = client_info
.get_change_limit(entity.entity())
.expect("entity should be present after adding component");
if ticks.is_changed(tick, change_tick.this_run()) {
update_message.write_component(
Expand All @@ -358,9 +357,7 @@ fn collect_changes(
// If there is any insertion or we must initialize, include all updates into init message
// and bump the last acknowledged tick to keep entity updates atomic.
init_message.take_entity_data(update_message)?;
client_info
.ticks
.insert(entity.entity(), change_tick.this_run());
client_info.set_change_limit(entity.entity(), change_tick.this_run());
} else {
update_message.end_entity_data()?;
}
Expand Down Expand Up @@ -420,7 +417,7 @@ fn collect_despawns(
for entity in despawn_buffer.drain(..) {
let mut shared_bytes = None;
for (message, _, client_info) in messages.iter_mut_with_info() {
client_info.ticks.remove(&entity);
client_info.remove_despawned(entity);
message.write_entity(&mut shared_bytes, entity)?;
}
}
Expand All @@ -446,7 +443,7 @@ fn collect_removals(
for (message, _, client_info) in messages.iter_mut_with_info() {
message.start_entity_data(entity);
for &replication_id in components {
client_info.ticks.insert(entity, tick);
client_info.set_change_limit(entity, tick);
message.write_replication_id(replication_id)?;
}
message.end_entity_data(false)?;
Expand Down
37 changes: 29 additions & 8 deletions src/server/clients_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ pub(crate) struct ClientInfo {
/// Indicates whether the client connected in this tick.
pub(super) just_connected: bool,

/// Last acknowledged tick for each entity.
pub(super) ticks: EntityHashMap<Entity, Tick>,
/// Lowest tick for use in change detection for each entity.
ticks: EntityHashMap<Entity, Tick>,

/// The last tick in which a replicated entity was spawned, despawned, or gained/lost a component.
/// The last tick in which a replicated entity was spawned, despawned, or gained/lost a component from the perspective
/// of the client.
///
/// It should be included in update messages and server events to avoid needless waiting for the next init message to arrive.
pub(crate) change_tick: RepliconTick,
Expand Down Expand Up @@ -172,9 +173,22 @@ impl ClientInfo {
(update_index, &mut update_info.entities)
}

/// Sets the change limit for an entity that is replicated to this client.
///
/// The change limit is the reference point for determining if components on an entity have changed and
/// need to be replicated. Component changes older than the change limit are assumed to be acked by the client.
pub(super) fn set_change_limit(&mut self, entity: Entity, tick: Tick) {
self.ticks.insert(entity, tick);
}

/// Gets the change limit for an entity that is replicated to this client.
pub(super) fn get_change_limit(&mut self, entity: Entity) -> Option<Tick> {
self.ticks.get(&entity).copied()
}

/// Marks update with the specified index as acknowledged.
///
/// Ticks for all entities from this update will be set to the update's tick if it's higher.
/// Change limits for all entities from this update will be set to the update's tick if it's higher.
///
/// Keeps allocated memory in the buffers for reuse.
pub(super) fn acknowledge(
Expand All @@ -192,10 +206,10 @@ impl ClientInfo {
};

for entity in &update_info.entities {
let last_tick = self
.ticks
.get_mut(entity)
.expect("tick should be inserted on any component insertion");
let Some(last_tick) = self.ticks.get_mut(entity) else {
// We ignore missing entities, since they were probably despawned.
continue;
};

// Received tick could be outdated because we bump it
// if we detect any insertion on the entity in `collect_changes`.
Expand All @@ -212,6 +226,13 @@ impl ClientInfo {
);
}

/// Removes a despawned entity tracked by this client.
pub fn remove_despawned(&mut self, entity: Entity) {
self.ticks.remove(&entity);
// We don't clean up `self.updates` for efficiency reasons.
// `Self::acknowledge()` will properly ignore despawned entities.
}

/// Removes all updates older then `min_timestamp`.
///
/// Keeps allocated memory in the buffers for reuse.
Expand Down
1 change: 1 addition & 0 deletions tests/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ fn despawn_update_replication() {

server_app.update();
client_app.update();
server_app.update(); // Let server receive an update to trigger acknowledgment.

assert!(client_app.world.entities().is_empty());
}
Expand Down

0 comments on commit b96fc72

Please sign in to comment.