Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make systems standalone functions #403

Merged
merged 2 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 77 additions & 81 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ impl Plugin for ClientPlugin {
PostUpdate,
(ClientSet::Send, ClientSet::SendPackets).chain(),
)
.add_systems(Startup, Self::setup_channels)
.add_systems(Startup, setup_channels)
.add_systems(
PreUpdate,
Self::receive_replication
receive_replication
.map(Result::unwrap)
.in_set(ClientSet::Receive)
.run_if(client_connected),
)
.add_systems(PreUpdate, Self::reset.in_set(ClientSet::Reset));
.add_systems(PreUpdate, reset.in_set(ClientSet::Reset));
}

fn finish(&self, app: &mut App) {
Expand All @@ -81,90 +81,86 @@ impl Plugin for ClientPlugin {
}
}

impl ClientPlugin {
fn setup_channels(mut client: ResMut<RepliconClient>, channels: Res<RepliconChannels>) {
client.setup_server_channels(channels.server_channels().len());
}
fn setup_channels(mut client: ResMut<RepliconClient>, channels: Res<RepliconChannels>) {
client.setup_server_channels(channels.server_channels().len());
}

/// Receives and applies replication messages from the server.
///
/// Update messages are sent over the [`ReplicationChannel::Updates`] and are applied first to ensure valid state
/// for component mutations.
///
/// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear
/// ahead-of or behind update messages from the same server tick. A mutation will only be applied if its
/// update tick has already appeared in an update message, otherwise it will be buffered while waiting.
/// Since component mutations can arrive in any order, they will only be applied if they correspond to a more
/// recent server tick than the last acked server tick for each entity.
///
/// Buffered mutate messages are processed last.
///
/// Acknowledgments for received mutate messages are sent back to the server.
///
/// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages).
pub(super) fn receive_replication(
world: &mut World,
mut queue: Local<CommandQueue>,
mut entity_markers: Local<EntityMarkers>,
) -> bincode::Result<()> {
world.resource_scope(|world, mut client: Mut<RepliconClient>| {
world.resource_scope(|world, mut entity_map: Mut<ServerEntityMap>| {
world.resource_scope(|world, mut buffered_mutations: Mut<BufferedMutations>| {
world.resource_scope(|world, command_markers: Mut<CommandMarkers>| {
world.resource_scope(|world, registry: Mut<ReplicationRegistry>| {
world.resource_scope(
|world, mut replicated_events: Mut<Events<EntityReplicated>>| {
let mut stats =
world.remove_resource::<ClientReplicationStats>();
let mut mutate_ticks =
world.remove_resource::<ServerMutateTicks>();
let mut params = ReceiveParams {
queue: &mut queue,
entity_markers: &mut entity_markers,
entity_map: &mut entity_map,
replicated_events: &mut replicated_events,
mutate_ticks: mutate_ticks.as_mut(),
stats: stats.as_mut(),
command_markers: &command_markers,
registry: &registry,
};

apply_replication(
world,
&mut params,
&mut client,
&mut buffered_mutations,
)?;

if let Some(stats) = stats {
world.insert_resource(stats);
}
if let Some(mutate_ticks) = mutate_ticks {
world.insert_resource(mutate_ticks);
}

Ok(())
},
)
})
/// Receives and applies replication messages from the server.
///
/// Update messages are sent over the [`ReplicationChannel::Updates`] and are applied first to ensure valid state
/// for component mutations.
///
/// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear
/// ahead-of or behind update messages from the same server tick. A mutation will only be applied if its
/// update tick has already appeared in an update message, otherwise it will be buffered while waiting.
/// Since component mutations can arrive in any order, they will only be applied if they correspond to a more
/// recent server tick than the last acked server tick for each entity.
///
/// Buffered mutate messages are processed last.
///
/// Acknowledgments for received mutate messages are sent back to the server.
///
/// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages).
pub(super) fn receive_replication(
world: &mut World,
mut queue: Local<CommandQueue>,
mut entity_markers: Local<EntityMarkers>,
) -> bincode::Result<()> {
world.resource_scope(|world, mut client: Mut<RepliconClient>| {
world.resource_scope(|world, mut entity_map: Mut<ServerEntityMap>| {
world.resource_scope(|world, mut buffered_mutations: Mut<BufferedMutations>| {
world.resource_scope(|world, command_markers: Mut<CommandMarkers>| {
world.resource_scope(|world, registry: Mut<ReplicationRegistry>| {
world.resource_scope(
|world, mut replicated_events: Mut<Events<EntityReplicated>>| {
let mut stats = world.remove_resource::<ClientReplicationStats>();
let mut mutate_ticks = world.remove_resource::<ServerMutateTicks>();
let mut params = ReceiveParams {
queue: &mut queue,
entity_markers: &mut entity_markers,
entity_map: &mut entity_map,
replicated_events: &mut replicated_events,
mutate_ticks: mutate_ticks.as_mut(),
stats: stats.as_mut(),
command_markers: &command_markers,
registry: &registry,
};

apply_replication(
world,
&mut params,
&mut client,
&mut buffered_mutations,
)?;

if let Some(stats) = stats {
world.insert_resource(stats);
}
if let Some(mutate_ticks) = mutate_ticks {
world.insert_resource(mutate_ticks);
}

Ok(())
},
)
})
})
})
})
}
})
}

fn reset(
mut update_tick: ResMut<ServerUpdateTick>,
mut entity_map: ResMut<ServerEntityMap>,
mut buffered_mutations: ResMut<BufferedMutations>,
stats: Option<ResMut<ClientReplicationStats>>,
) {
*update_tick = Default::default();
entity_map.clear();
buffered_mutations.clear();
if let Some(mut stats) = stats {
*stats = Default::default();
}
fn reset(
mut update_tick: ResMut<ServerUpdateTick>,
mut entity_map: ResMut<ServerEntityMap>,
mut buffered_mutations: ResMut<BufferedMutations>,
stats: Option<ResMut<ClientReplicationStats>>,
) {
*update_tick = Default::default();
entity_map.clear();
buffered_mutations.clear();
if let Some(mut stats) = stats {
*stats = Default::default();
}
}

Expand Down
149 changes: 71 additions & 78 deletions src/client/diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,122 +17,115 @@ impl Plugin for ClientDiagnosticsPlugin {
app.init_resource::<ClientReplicationStats>()
.add_systems(
PreUpdate,
Self::add_measurements
add_measurements
.in_set(ClientSet::Diagnostics)
.run_if(client_connected),
)
.register_diagnostic(
Diagnostic::new(Self::RTT)
Diagnostic::new(RTT)
.with_suffix(" s")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::PACKET_LOSS)
Diagnostic::new(PACKET_LOSS)
.with_suffix(" %")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::SENT_BPS)
Diagnostic::new(SENT_BPS)
.with_suffix(" byte/s")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::RECEIVED_BPS)
Diagnostic::new(RECEIVED_BPS)
.with_suffix(" byte/s")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::ENTITIES_CHANGED)
Diagnostic::new(ENTITIES_CHANGED)
.with_suffix(" entities changed")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::COMPONENTS_CHANGED)
Diagnostic::new(COMPONENTS_CHANGED)
.with_suffix(" components changed")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::MAPPINGS)
Diagnostic::new(MAPPINGS)
.with_suffix(" mappings")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::DESPAWNS)
Diagnostic::new(DESPAWNS)
.with_suffix(" despawns")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::REPLICATION_MESSAGES)
Diagnostic::new(REPLICATION_MESSAGES)
.with_suffix(" replication messages")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
)
.register_diagnostic(
Diagnostic::new(Self::REPLICATION_BYTES)
Diagnostic::new(REPLICATION_BYTES)
.with_suffix(" replication bytes")
.with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN),
.with_max_history_length(DIAGNOSTIC_HISTORY_LEN),
);
}
}

impl ClientDiagnosticsPlugin {
/// Round-trip time.
pub const RTT: DiagnosticPath = DiagnosticPath::const_new("client/rtt");
/// The percent of packet loss.
pub const PACKET_LOSS: DiagnosticPath = DiagnosticPath::const_new("client/packet_loss");
/// How many messages sent per second.
pub const SENT_BPS: DiagnosticPath = DiagnosticPath::const_new("client/sent_bps");
/// How many bytes received per second.
pub const RECEIVED_BPS: DiagnosticPath = DiagnosticPath::const_new("client/received_bps");
/// Round-trip time.
pub const RTT: DiagnosticPath = DiagnosticPath::const_new("client/rtt");
/// The percent of packet loss.
pub const PACKET_LOSS: DiagnosticPath = DiagnosticPath::const_new("client/packet_loss");
/// How many messages sent per second.
pub const SENT_BPS: DiagnosticPath = DiagnosticPath::const_new("client/sent_bps");
/// How many bytes received per second.
pub const RECEIVED_BPS: DiagnosticPath = DiagnosticPath::const_new("client/received_bps");

/// How many entities changed by replication.
pub const ENTITIES_CHANGED: DiagnosticPath =
DiagnosticPath::const_new("client/replication/entities_changed");
/// How many components changed by replication.
pub const COMPONENTS_CHANGED: DiagnosticPath =
DiagnosticPath::const_new("client/replication/components_changed");
/// How many client-mappings added by replication.
pub const MAPPINGS: DiagnosticPath = DiagnosticPath::const_new("client/replication/mappings");
/// How many despawns applied by replication.
pub const DESPAWNS: DiagnosticPath = DiagnosticPath::const_new("client/replication/despawns");
/// How many replication messages received.
pub const REPLICATION_MESSAGES: DiagnosticPath =
DiagnosticPath::const_new("client/replication/messages");
/// How many replication bytes received.
pub const REPLICATION_BYTES: DiagnosticPath =
DiagnosticPath::const_new("client/replication/bytes");
/// How many entities changed by replication.
pub const ENTITIES_CHANGED: DiagnosticPath =
DiagnosticPath::const_new("client/replication/entities_changed");
/// How many components changed by replication.
pub const COMPONENTS_CHANGED: DiagnosticPath =
DiagnosticPath::const_new("client/replication/components_changed");
/// How many client-mappings added by replication.
pub const MAPPINGS: DiagnosticPath = DiagnosticPath::const_new("client/replication/mappings");
/// How many despawns applied by replication.
pub const DESPAWNS: DiagnosticPath = DiagnosticPath::const_new("client/replication/despawns");
/// How many replication messages received.
pub const REPLICATION_MESSAGES: DiagnosticPath =
DiagnosticPath::const_new("client/replication/messages");
/// How many replication bytes received.
pub const REPLICATION_BYTES: DiagnosticPath = DiagnosticPath::const_new("client/replication/bytes");

/// Max diagnostic history length.
pub const DIAGNOSTIC_HISTORY_LEN: usize = 60;
/// Max diagnostic history length.
pub const DIAGNOSTIC_HISTORY_LEN: usize = 60;

fn add_measurements(
mut diagnostics: Diagnostics,
stats: Res<ClientReplicationStats>,
mut last_stats: Local<ClientReplicationStats>,
client: Res<RepliconClient>,
) {
diagnostics.add_measurement(&Self::RTT, || client.rtt());
diagnostics.add_measurement(&Self::PACKET_LOSS, || client.packet_loss());
diagnostics.add_measurement(&Self::SENT_BPS, || client.sent_bps());
diagnostics.add_measurement(&Self::RECEIVED_BPS, || client.received_bps());
fn add_measurements(
mut diagnostics: Diagnostics,
stats: Res<ClientReplicationStats>,
mut last_stats: Local<ClientReplicationStats>,
client: Res<RepliconClient>,
) {
diagnostics.add_measurement(&RTT, || client.rtt());
diagnostics.add_measurement(&PACKET_LOSS, || client.packet_loss());
diagnostics.add_measurement(&SENT_BPS, || client.sent_bps());
diagnostics.add_measurement(&RECEIVED_BPS, || client.received_bps());

diagnostics.add_measurement(&Self::ENTITIES_CHANGED, || {
(stats.entities_changed - last_stats.entities_changed) as f64
});
diagnostics.add_measurement(&Self::COMPONENTS_CHANGED, || {
(stats.components_changed - last_stats.components_changed) as f64
});
diagnostics.add_measurement(&Self::MAPPINGS, || {
(stats.mappings - last_stats.mappings) as f64
});
diagnostics.add_measurement(&Self::DESPAWNS, || {
(stats.despawns - last_stats.despawns) as f64
});
diagnostics.add_measurement(&Self::REPLICATION_MESSAGES, || {
(stats.messages - last_stats.messages) as f64
});
diagnostics.add_measurement(&Self::REPLICATION_BYTES, || {
(stats.bytes - last_stats.bytes) as f64
});
*last_stats = *stats;
}
diagnostics.add_measurement(&ENTITIES_CHANGED, || {
(stats.entities_changed - last_stats.entities_changed) as f64
});
diagnostics.add_measurement(&COMPONENTS_CHANGED, || {
(stats.components_changed - last_stats.components_changed) as f64
});
diagnostics.add_measurement(&MAPPINGS, || (stats.mappings - last_stats.mappings) as f64);
diagnostics.add_measurement(&DESPAWNS, || (stats.despawns - last_stats.despawns) as f64);
diagnostics.add_measurement(&REPLICATION_MESSAGES, || {
(stats.messages - last_stats.messages) as f64
});
diagnostics.add_measurement(&REPLICATION_BYTES, || {
(stats.bytes - last_stats.bytes) as f64
});
*last_stats = *stats;
}
Loading