Skip to content

Commit

Permalink
Track change tick for each client individually (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shatur authored Jan 12, 2024
1 parent 5de3a92 commit 8ac746c
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 110 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- API for custom server messages now uses `server_event::serialize_with` and `server_event::deserialize_with`. For more details see the example in the docs.
- Speedup serialization for multiple clients by reusing already serialized components and entities.
- Hide extra functionality from `ServerEventQueue`.
- Move server event reset system to new set `ClientSet::ResetEvents` in `PreUpdate`.
- Make `NetworkChannels` channel-creation methods public (`create_client_channel()` and `create_server_channel()`).
- Implement `Eq` and `PartialEq` on `EventType`.

### Removed

- `LastChangeTick` resource, `ClientsInfo` should be used instead.

## [0.19.0] - 2024-01-07

### Added
Expand Down
12 changes: 5 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ fn apply_replication(

let mut result = Ok(());
buffered_updates.0.retain(|update| {
if update.last_change_tick > replicon_tick {
if update.change_tick > replicon_tick {
return true;
}

Expand Down Expand Up @@ -276,11 +276,11 @@ fn apply_update_message(
stats.bytes += end_pos;
}

let (last_change_tick, message_tick, update_index) = bincode::deserialize_from(&mut cursor)?;
if last_change_tick > replicon_tick {
let (change_tick, message_tick, update_index) = bincode::deserialize_from(&mut cursor)?;
if change_tick > replicon_tick {
trace!("buffering update message for {message_tick:?}");
buffered_updates.0.push(BufferedUpdate {
last_change_tick,
change_tick,
message_tick,
message: message.slice(cursor.position() as usize..),
});
Expand Down Expand Up @@ -534,9 +534,7 @@ impl BufferedUpdates {
/// See also [`crate::server::replication_messages::UpdateMessage`].
pub(super) struct BufferedUpdate {
/// Required tick to wait for.
///
/// See also [`crate::server::LastChangeTick`].
last_change_tick: RepliconTick,
change_tick: RepliconTick,

/// The tick this update corresponds to.
message_tick: RepliconTick,
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ pub mod prelude {
NetworkChannels, ReplicationChannel, RepliconCorePlugin,
},
server::{
has_authority, ClientEntityMap, ClientMapping, LastChangeTick, ServerPlugin, ServerSet,
TickPolicy, SERVER_ID,
clients_info::ClientsInfo, has_authority, ClientEntityMap, ClientMapping, ServerPlugin,
ServerSet, TickPolicy, SERVER_ID,
},
ReplicationPlugins,
};
Expand Down
181 changes: 122 additions & 59 deletions src/network_event/server_event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::io::Cursor;

use bevy::{ecs::event::Event, prelude::*};
use bevy_renet::{
client_connected,
renet::{ClientId, RenetClient, RenetServer, SendType},
renet::{Bytes, ClientId, RenetClient, RenetServer, SendType},
};
use bincode::{DefaultOptions, Options};
use ordered_multimap::ListOrderedMultimap;
Expand All @@ -15,7 +17,10 @@ use crate::{
replicon_core::{
replication_rules::MapNetworkEntities, replicon_tick::RepliconTick, NetworkChannels,
},
server::{has_authority, LastChangeTick, ServerSet, SERVER_ID},
server::{
clients_info::{ClientInfo, ClientsInfo},
has_authority, ServerSet, SERVER_ID,
},
};

/// An extension trait for [`App`] for creating server events.
Expand Down Expand Up @@ -66,16 +71,17 @@ pub trait ServerEventAppExt {
fn sending_reflect_system(
mut server: ResMut<RenetServer>,
mut reflect_events: EventReader<ToClients<ReflectEvent>>,
last_change_tick: Res<LastChangeTick>,
clients_info: Res<ClientsInfo>,
channel: Res<ServerEventChannel<ReflectEvent>>,
registry: Res<AppTypeRegistry>,
) {
let registry = registry.read();
for ToClients { event, mode } in reflect_events.read() {
let message = serialize_reflect_event(**last_change_tick, &event, &registry)
.expect("server event should be serializable");
server_event::send(&mut server, *channel, *mode, message)
server_event::send_with(&mut server, &clients_info, *channel, *mode, |cursor| {
let serializer = ReflectSerializer::new(&*event.0, &registry);
DefaultOptions::new().serialize_into(cursor, &serializer)
})
.expect("server event should be serializable");
}
}
Expand All @@ -89,8 +95,13 @@ pub trait ServerEventAppExt {
) {
let registry = registry.read();
while let Some(message) = client.receive_message(*channel) {
let (tick, event) = deserialize_reflect_event(&message, &registry)
.expect("server should send valid events");
let (tick, event) = server_event::deserialize_with(&message, |cursor| {
let mut deserializer =
bincode::Deserializer::with_reader(cursor, DefaultOptions::new());
let reflect = UntypedReflectDeserializer::new(&registry).deserialize(&mut deserializer)?;
Ok(ReflectEvent(reflect))
})
.expect("server should send valid events");
// Event should be sent to the queue if replication message with its tick has not yet arrived.
if tick <= *replicon_tick {
Expand All @@ -103,32 +114,6 @@ pub trait ServerEventAppExt {
#[derive(Event)]
struct ReflectEvent(Box<dyn Reflect>);
fn serialize_reflect_event(
tick: RepliconTick,
event: &ReflectEvent,
registry: &TypeRegistry,
) -> bincode::Result<Vec<u8>> {
let mut message = Vec::new();
DefaultOptions::new().serialize_into(&mut message, &tick)?;
let serializer = ReflectSerializer::new(&*event.0, registry);
DefaultOptions::new().serialize_into(&mut message, &serializer)?;
Ok(message)
}
fn deserialize_reflect_event(
message: &[u8],
registry: &TypeRegistry,
) -> bincode::Result<(RepliconTick, ReflectEvent)> {
let mut cursor = Cursor::new(message);
let tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let mut deserializer =
bincode::Deserializer::with_reader(&mut cursor, DefaultOptions::new());
let reflect = UntypedReflectDeserializer::new(registry).deserialize(&mut deserializer)?;
Ok((tick, ReflectEvent(reflect)))
}
```
*/
fn add_server_event_with<T: Event, Marker1, Marker2>(
Expand Down Expand Up @@ -218,9 +203,10 @@ fn receiving_system<T: Event + DeserializeOwned>(
channel: Res<ServerEventChannel<T>>,
) {
while let Some(message) = client.receive_message(*channel) {
let (tick, event) = DefaultOptions::new()
.deserialize(&message)
.expect("server should send valid events");
let (tick, event) = deserialize_with(&message, |cursor| {
DefaultOptions::new().deserialize_from(cursor)
})
.expect("server should send valid events");

if tick <= *replicon_tick {
server_events.send(event);
Expand All @@ -239,9 +225,10 @@ fn receiving_and_mapping_system<T: Event + MapNetworkEntities + DeserializeOwned
channel: Res<ServerEventChannel<T>>,
) {
while let Some(message) = client.receive_message(*channel) {
let (tick, mut event): (_, T) = DefaultOptions::new()
.deserialize(&message)
.expect("server should send valid mapped events");
let (tick, mut event): (_, T) = deserialize_with(&message, |cursor| {
DefaultOptions::new().deserialize_from(cursor)
})
.expect("server should send valid events");

event.map_entities(&mut EventMapper(entity_map.to_client()));
if tick <= *replicon_tick {
Expand All @@ -255,15 +242,14 @@ fn receiving_and_mapping_system<T: Event + MapNetworkEntities + DeserializeOwned
fn sending_system<T: Event + Serialize>(
mut server: ResMut<RenetServer>,
mut server_events: EventReader<ToClients<T>>,
last_change_tick: Res<LastChangeTick>,
clients_info: Res<ClientsInfo>,
channel: Res<ServerEventChannel<T>>,
) {
for ToClients { event, mode } in server_events.read() {
let message = DefaultOptions::new()
.serialize(&(**last_change_tick, event))
.expect("server event should be serializable");

send(&mut server, *channel, *mode, message);
send_with(&mut server, &clients_info, *channel, *mode, |cursor| {
DefaultOptions::new().serialize_into(cursor, &event)
})
.expect("server event should be serializable");
}
}

Expand Down Expand Up @@ -305,33 +291,110 @@ fn reset_system<T: Event>(mut event_queue: ResMut<ServerEventQueue<T>>) {
event_queue.0.clear();
}

/// Sends serialized `message` to clients.
///
/// Helper for custom sending systems.
/// See also [`ServerEventAppExt::add_server_event_with`]
pub fn send<T>(
///
/// See also [`ServerEventAppExt::add_server_event_with`].
pub fn send_with<T>(
server: &mut RenetServer,
clients_info: &ClientsInfo,
channel: ServerEventChannel<T>,
mode: SendMode,
message: Vec<u8>,
) {
serialize_fn: impl Fn(&mut Cursor<Vec<u8>>) -> bincode::Result<()>,
) -> bincode::Result<()> {
match mode {
SendMode::Broadcast => {
server.broadcast_message(channel, message);
let mut previous_message = None;
for client_info in clients_info.iter() {
let message = serialize_with(client_info, previous_message, &serialize_fn)?;
server.send_message(client_info.id(), channel, message.bytes.clone());
previous_message = Some(message);
}
}
SendMode::BroadcastExcept(client_id) => {
if client_id == SERVER_ID {
server.broadcast_message(channel, message);
} else {
server.broadcast_message_except(client_id, channel, message);
let mut previous_message = None;
for client_info in clients_info.iter() {
if client_info.id() == client_id {
continue;
}
let message = serialize_with(client_info, previous_message, &serialize_fn)?;
server.send_message(client_info.id(), channel, message.bytes.clone());
previous_message = Some(message);
}
}
SendMode::Direct(client_id) => {
if client_id != SERVER_ID {
server.send_message(client_id, channel, message);
if let Some(client_info) = clients_info
.iter()
.find(|client_info| client_info.id() == client_id)
{
let message = serialize_with(client_info, None, &serialize_fn)?;
server.send_message(client_info.id(), channel, message.bytes);
}
}
}
}

Ok(())
}

fn serialize_with(
client_info: &ClientInfo,
previous_message: Option<SerializedMessage>,
serialize_fn: impl Fn(&mut Cursor<Vec<u8>>) -> bincode::Result<()>,
) -> bincode::Result<SerializedMessage> {
if let Some(previous_message) = previous_message {
if previous_message.tick == client_info.change_tick {
return Ok(previous_message);
}

let tick_size = DefaultOptions::new().serialized_size(&client_info.change_tick)? as usize;
let mut bytes = Vec::with_capacity(tick_size + previous_message.event_bytes().len());
DefaultOptions::new().serialize_into(&mut bytes, &client_info.change_tick)?;
bytes.extend_from_slice(previous_message.event_bytes());
let message = SerializedMessage {
tick: client_info.change_tick,
tick_size,
bytes: bytes.into(),
};

Ok(message)
} else {
let mut cursor = Cursor::new(Vec::new());
DefaultOptions::new().serialize_into(&mut cursor, &client_info.change_tick)?;
let tick_size = cursor.get_ref().len();
(serialize_fn)(&mut cursor)?;
let message = SerializedMessage {
tick: client_info.change_tick,
tick_size,
bytes: cursor.into_inner().into(),
};

Ok(message)
}
}

struct SerializedMessage {
tick: RepliconTick,
tick_size: usize,
bytes: Bytes,
}

impl SerializedMessage {
fn event_bytes(&self) -> &[u8] {
&self.bytes[self.tick_size..]
}
}

/// Deserializes event change tick first and then calls the specified deserialization function to get the event itself.
pub fn deserialize_with<T>(
message: &[u8],
deserialize_fn: impl FnOnce(&mut Cursor<&[u8]>) -> bincode::Result<T>,
) -> bincode::Result<(RepliconTick, T)> {
let mut cursor = Cursor::new(message);
let tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let event = (deserialize_fn)(&mut cursor)?;

Ok((tick, event))
}

/// An event that will be send to client(s).
Expand Down
20 changes: 4 additions & 16 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl Plugin for ServerPlugin {
))
.init_resource::<ClientsInfo>()
.init_resource::<ClientBuffers>()
.init_resource::<LastChangeTick>()
.init_resource::<ClientEntityMap>()
.configure_sets(PreUpdate, ServerSet::Receive.after(RenetReceive))
.configure_sets(PostUpdate, ServerSet::Send.before(RenetSend))
Expand Down Expand Up @@ -194,7 +193,6 @@ impl ServerPlugin {
ResMut<ClientEntityMap>,
ResMut<DespawnBuffer>,
ResMut<RemovalBuffer>,
ResMut<LastChangeTick>,
ResMut<ClientBuffers>,
ResMut<RenetServer>,
)>,
Expand All @@ -218,21 +216,18 @@ impl ServerPlugin {
collect_despawns(&mut messages, &mut set.p3())?;
collect_removals(&mut messages, &mut set.p4(), change_tick.this_run())?;

let last_change_tick = *set.p5();
let mut client_buffers = mem::take(&mut *set.p6());
let (last_change_tick, clients_info) = messages.send(
&mut set.p7(),
let mut client_buffers = mem::take(&mut *set.p5());
let clients_info = messages.send(
&mut set.p6(),
&mut client_buffers,
last_change_tick,
*replicon_tick,
change_tick.this_run(),
time.elapsed(),
)?;

// Return borrowed data back.
*set.p1() = clients_info;
*set.p5() = last_change_tick;
*set.p6() = client_buffers;
*set.p5() = client_buffers;

Ok(())
}
Expand Down Expand Up @@ -504,13 +499,6 @@ pub enum TickPolicy {
Manual,
}

/// Contains the last tick in which a replicated entity was spawned, despawned, or gained/lost a component.
///
/// It should be included in update messages and server events instead of the current tick
/// to avoid needless waiting for the next init message to arrive.
#[derive(Clone, Copy, Debug, Default, Deref, Resource)]
pub struct LastChangeTick(RepliconTick);

/**
A resource that exists on the server for mapping server entities to
entities that clients have already spawned. The mappings are sent to clients as part of replication
Expand Down
Loading

0 comments on commit 8ac746c

Please sign in to comment.