Skip to content

Commit

Permalink
Merge pull request #276 from TruncateGame/fix/network-boo-boos
Browse files Browse the repository at this point in the history
Improve resilience of websocket messages
  • Loading branch information
bglw authored Aug 17, 2024
2 parents f07ca52 + 1e1106f commit 3af09d2
Show file tree
Hide file tree
Showing 14 changed files with 350 additions and 66 deletions.
3 changes: 2 additions & 1 deletion truncate_client/src/handle_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub fn handle_server_msg(outer: &mut OuterApplication, ui: &mut egui::Ui) {

while let Ok(msg) = recv() {
match msg {
GameMessage::Ping => {}
GameMessage::Ping | GameMessage::Ack(_) | GameMessage::PleaseLogin => { /* handled at comms layer */
}
GameMessage::JoinedLobby(player_index, id, players, board, token) => {
// If we're already in a lobby, treat this as a lobby update
// (the websocket probably dropped and reconnected)
Expand Down
44 changes: 44 additions & 0 deletions truncate_client/src/lil_bits/result_modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ pub struct ResultModalResigning {
msg: String,
}

#[derive(Clone)]
pub struct ResultModalLoading {}

#[derive(Clone)]
pub enum ResultModalVariant {
Daily(ResultModalDaily),
Unique(ResultModalUnique),
Resigning(ResultModalResigning),
Loading(ResultModalLoading),
}

#[derive(Clone)]
Expand Down Expand Up @@ -134,13 +138,22 @@ impl ResultModalUI {
}),
}
}

pub fn new_resigning(ui: &mut egui::Ui, msg: String) -> Self {
ResultModalUI::seed_animations(ui);

Self {
contents: ResultModalVariant::Resigning(ResultModalResigning { msg }),
}
}

pub fn new_loading(ui: &mut egui::Ui) -> Self {
ResultModalUI::seed_animations(ui);

Self {
contents: ResultModalVariant::Loading(ResultModalLoading {}),
}
}
}

#[derive(Hash)]
Expand Down Expand Up @@ -374,6 +387,25 @@ impl ResultModalUI {
ui,
);
}
ResultModalVariant::Loading(_l) => {
let summary_text = TextHelper::heavy("Loading", 12.0, None, &mut ui);

summary_text.paint_within(
heading_rect.translate(vec2(0.0, -(heading_rect.height() / 2.0 + 8.0))),
Align2::CENTER_BOTTOM,
Color32::WHITE,
ui,
);

let summary_text = TextHelper::heavy("Statistics", 12.0, None, &mut ui);

summary_text.paint_within(
heading_rect.translate(vec2(0.0, heading_rect.height() / 2.0 + 8.0)),
Align2::CENTER_TOP,
Color32::WHITE,
ui,
);
}
}

// Wait for the main text to move out of the way before showing details
Expand Down Expand Up @@ -504,6 +536,18 @@ impl ResultModalUI {
msg = Some(ResultModalAction::Dismiss);
}
}
ResultModalVariant::Loading(_l) => {
ui.add_space(50.0);

let summary_text = TextHelper::heavy("Waiting for", 10.0, None, &mut ui);

summary_text.paint(Color32::WHITE, ui, true);

let summary_text =
TextHelper::heavy("network connection", 10.0, None, &mut ui);

summary_text.paint(Color32::WHITE, ui, true);
}
};

// Paint over everything below the heading stats to fade them in from black
Expand Down
4 changes: 4 additions & 0 deletions truncate_client/src/native_comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use truncate_core::messages::{GameMessage, PlayerMessage};

/*
TODO: Implement the pending_messages retry flow from web_comms
*/

pub async fn connect(
connect_addr: String,
tx_game: mpsc::Sender<GameMessage>,
Expand Down
13 changes: 12 additions & 1 deletion truncate_client/src/regions/single_player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,15 @@ impl SinglePlayerState {
));
}
}

if self.splash.is_none() {
// TODO: Add a special splash screen for somehow having no token / not being logged in
self.splash = Some(ResultModalUI::new_loading(&mut ui));

if let Some(token) = logged_in_as {
msgs_to_server.push(PlayerMessage::RequestStats(token.clone()));
}
}
} else {
if self.splash.is_none() {
self.splash = Some(ResultModalUI::new_unique(
Expand Down Expand Up @@ -637,7 +646,9 @@ impl SinglePlayerState {
moves: self.move_sequence.clone(),
won: self.winner == Some(human_player),
});
msgs_to_server.push(PlayerMessage::RequestStats(token.clone()));

// Ensure we never pull up an old splash screen without this move
self.daily_stats = None;
}
}
}
Expand Down
152 changes: 130 additions & 22 deletions truncate_client/src/web_comms.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};

use eframe::egui::Context;
use eframe::web_sys;
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use futures_util::{future, pin_mut, StreamExt};
use truncate_core::messages::{GameMessage, PlayerMessage};
use truncate_core::messages::{GameMessage, Nonce, NoncedPlayerMessage, PlayerMessage};
use web_sys::console;
use ws_stream_wasm::{WsMessage, WsMeta, WsStream};

use crate::utils::macros::current_time;

async fn websocket_connect(connect_addr: &String) -> Result<WsStream, ()> {
console::log_1(&format!("Connecting to {connect_addr}").into());

Expand All @@ -34,16 +38,58 @@ pub async fn connect(
context = Some(ctx);
}

let most_recent_token: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let most_recent_game_token: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let most_recent_login: Arc<Mutex<Option<PlayerMessage>>> = Arc::new(Mutex::new(None));

let requested_login = AtomicBool::new(false);

type NonceQueue = VecDeque<(Option<Nonce>, WsMessage)>;
let mut pending_messages: NonceQueue = VecDeque::new();
let unconfirmed_messages: Arc<Mutex<NonceQueue>> = Arc::new(Mutex::new(VecDeque::new()));

let mut current_nonce = 0_u64;
let mut get_nonce = || {
current_nonce += 1;
Nonce {
generated_at: current_time!().as_secs(),
id: current_nonce,
}
};

let mut outgoing_msg_stream = rx_player.map(|msg| {
let mut outgoing_msg_stream = rx_player.map(|message| {
// Store a token that we're interacting with, in case we need to
// recreate the connection.
if let PlayerMessage::RejoinGame(token) = &msg {
*most_recent_token.lock().unwrap() = Some(token.to_string());
if let PlayerMessage::RejoinGame(token) = &message {
*most_recent_game_token.lock().unwrap() = Some(token.to_string());
}

if let PlayerMessage::Login { .. } = &message {
*most_recent_login.lock().unwrap() = Some(message.clone());
}

WsMessage::Text(serde_json::to_string(&msg.clone()).unwrap())
match &message {
// Avoid noncing pings since we don't care about any individual ping.
// Avoid noncing pre-login methods, as nonces don't work if the player is not logged in.
PlayerMessage::Ping
| PlayerMessage::Login { .. }
| PlayerMessage::CreateAnonymousPlayer { .. } => (
None,
WsMessage::Text(serde_json::to_string(&message).unwrap()),
),
_ => {
let nonce = get_nonce();

let wrapped_msg = NoncedPlayerMessage {
nonce: nonce.clone(),
message,
};

(
Some(nonce),
WsMessage::Text(serde_json::to_string(&wrapped_msg).unwrap()),
)
}
}
});

loop {
Expand All @@ -55,7 +101,14 @@ pub async fn connect(

let (mut outgoing, incoming) = wsio.split();

if let Some(token) = most_recent_token.lock().unwrap().clone() {
if let Some(login) = most_recent_login.lock().unwrap().clone() {
let encoded_login_msg = WsMessage::Text(serde_json::to_string(&login).unwrap());
if outgoing.send(encoded_login_msg).await.is_err() {
continue;
};
}

if let Some(token) = most_recent_game_token.lock().unwrap().clone() {
let reconnection_msg = PlayerMessage::RejoinGame(token);
let encoded_reconnection_msg =
WsMessage::Text(serde_json::to_string(&reconnection_msg).unwrap());
Expand All @@ -64,6 +117,12 @@ pub async fn connect(
};
}

{
let mut unconfirmed = unconfirmed_messages.lock().unwrap();
unconfirmed.extend(pending_messages.drain(..));
std::mem::swap(&mut *unconfirmed, &mut pending_messages);
}

let game_messages = {
incoming.for_each(|msg| async {
let parsed_msg: GameMessage = match msg {
Expand All @@ -73,14 +132,34 @@ pub async fn connect(
}
};

if matches!(parsed_msg, GameMessage::Ping) {
_ = tx_player.clone().send(PlayerMessage::Ping).await;
}

// Store a token that we're interacting with, in case we need to
// recreate the connection.
if let GameMessage::JoinedLobby(_, _, _, _, token) = &parsed_msg {
*most_recent_token.lock().unwrap() = Some(token.to_string());
match &parsed_msg {
GameMessage::Ping => {
_ = tx_player.clone().send(PlayerMessage::Ping).await;
}
GameMessage::PleaseLogin => {
requested_login.store(true, std::sync::atomic::Ordering::Relaxed);
}
GameMessage::Ack(nonce) => {
let mut msgs = unconfirmed_messages.lock().unwrap();
if let Some(pos) = msgs
.iter()
.position(|(n, _)| n.as_ref().is_some_and(|n| n == nonce))
{
if pos != 0 {
tracing::warn!("Received an out of order ack from server at {pos}");
}

for _ in 0..=pos {
msgs.pop_front();
}
}
}
GameMessage::JoinedLobby(_, _, _, _, token) => {
// Store a token that we're interacting with, in case we need to
// recreate the connection.
*most_recent_game_token.lock().unwrap() = Some(token.to_string());
}
_ => { /* no processing needed */ }
}

tx_game
Expand All @@ -96,13 +175,42 @@ pub async fn connect(

let player_messages = async {
loop {
let msg = outgoing_msg_stream.next().await;
if let Some(msg) = msg {
if outgoing.send(msg).await.is_err() {
continue;
};
} else {
panic!("Internal stream closed");
if pending_messages.is_empty() {
match outgoing_msg_stream.next().await {
Some(msg) => {
pending_messages.push_back(msg);
}
None => {
panic!("Internal stream closed");
}
}
};

if requested_login.load(std::sync::atomic::Ordering::Relaxed) {
if let Some(login) = most_recent_login.lock().unwrap().as_ref() {
requested_login.store(false, std::sync::atomic::Ordering::Relaxed);
pending_messages.push_front((
None,
WsMessage::Text(serde_json::to_string(&login).unwrap()),
))
}
}

if let Some(msg) = pending_messages.get(0).cloned() {
match outgoing.send(msg.1).await {
Ok(()) => {
if let (Some(nonce), msg) = pending_messages
.pop_front()
.expect("nothing else should remove from pending_messages")
{
let mut unconfirmed = unconfirmed_messages.lock().unwrap();
unconfirmed.push_back((Some(nonce), msg))
}
}
Err(err) => {
tracing::debug!("Send err: {err:?}");
}
}
}
}
};
Expand Down
9 changes: 7 additions & 2 deletions truncate_core/src/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct Game {
}

// TODO: Move this to a helper file somewhere
fn now() -> u64 {
pub fn now() -> u64 {
instant::SystemTime::now()
.duration_since(instant::SystemTime::UNIX_EPOCH)
.expect("Please don't play Truncate before 1970")
Expand Down Expand Up @@ -318,13 +318,18 @@ impl Game {
return Err(GamePlayError::NonAdjacentPlace);
}

changes.push(self.players[player].use_tile(tile, &mut self.bag)?);
if !self.players[player].has_tile(tile) {
return Err(GamePlayError::PlayerDoesNotHaveTile { player, tile });
}

changes.push(Change::Board(BoardChange {
detail: self
.board
.set(position, player, tile, attacker_dictionary)?,
action: BoardChangeAction::Added,
}));
changes.push(self.players[player].use_tile(tile, &mut self.bag)?);

self.resolve_attack(
player,
position,
Expand Down
Loading

0 comments on commit 3af09d2

Please sign in to comment.