diff --git a/Cargo.toml b/Cargo.toml index 340f035..ac68439 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "graphul" -version = "0.4.5" +version = "0.4.6" edition = "2021" license = "MIT" categories = ["asynchronous", "network-programming", "web-programming::http-server"] @@ -13,9 +13,9 @@ readme = "README.md" [dependencies] hyper = { version = "0.14", features = ["full"] } -tokio = { version = "1", features = ["full"] } +tokio = { version = "1.24", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } -axum = { version = "0.6", features = ["multipart"] } +axum = { version = "0.6", features = ["multipart", "ws", "headers"] } askama = "0.11" futures = "0.3.24" tower = { version = "0.4", features = ["util"] } diff --git a/examples/chat-websocket/Cargo.toml b/examples/chat-websocket/Cargo.toml new file mode 100644 index 0000000..2a9f65f --- /dev/null +++ b/examples/chat-websocket/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "example-chat" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +graphul = { path = "../../." } +tokio = { version = "1", features = ["full"] } +futures = "0.3" diff --git a/examples/chat-websocket/src/chat/domain.rs b/examples/chat-websocket/src/chat/domain.rs new file mode 100644 index 0000000..f61342b --- /dev/null +++ b/examples/chat-websocket/src/chat/domain.rs @@ -0,0 +1,22 @@ +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; + +use tokio::sync::broadcast; + +// Our shared state +pub struct AppState { + // We require unique usernames. This tracks which usernames have been taken. + pub user_set: Mutex>, + // Channel used to send messages to all connected clients. + pub tx: broadcast::Sender, +} + +pub fn app_state() -> Arc { + // Set up application state for use with share_state. + let user_set = Mutex::new(HashSet::new()); + let (tx, _rx) = broadcast::channel(100); + + Arc::new(AppState { user_set, tx }) +} diff --git a/examples/chat-websocket/src/chat/handlers.rs b/examples/chat-websocket/src/chat/handlers.rs new file mode 100644 index 0000000..3494c02 --- /dev/null +++ b/examples/chat-websocket/src/chat/handlers.rs @@ -0,0 +1,105 @@ +use futures::{sink::SinkExt, stream::StreamExt}; +use graphul::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + State, + }, + IntoResponse, +}; +use std::sync::Arc; + +use crate::domain; + +pub async fn websocket_handler( + ws: WebSocketUpgrade, + State(state): State>, +) -> impl IntoResponse { + ws.on_upgrade(|socket| websocket(socket, state)) +} + +// This function deals with a single websocket connection, i.e., a single +// connected client / user, for which we will spawn two independent tasks (for +// receiving / sending chat messages). +async fn websocket(stream: WebSocket, state: Arc) { + // By splitting, we can send and receive at the same time. + let (mut sender, mut receiver) = stream.split(); + + // Username gets set in the receive loop, if it's valid. + let mut username = String::new(); + // Loop until a text message is found. + while let Some(Ok(message)) = receiver.next().await { + if let Message::Text(name) = message { + // If username that is sent by client is not taken, fill username string. + check_username(&state, &mut username, &name); + + // If not empty we want to quit the loop else we want to quit function. + if !username.is_empty() { + break; + } else { + // Only send our client that username is taken. + let _ = sender + .send(Message::Text(String::from("Username already taken."))) + .await; + + return; + } + } + } + + // We subscribe *before* sending the "joined" message, so that we will also + // display it to our client. + let mut rx = state.tx.subscribe(); + + // Now send the "joined" message to all subscribers. + let msg = format!("{} joined.", username); + println!("{}", msg); + let _ = state.tx.send(msg); + + // Spawn the first task that will receive broadcast messages and send text + // messages over the websocket to our client. + let mut send_task = tokio::spawn(async move { + while let Ok(msg) = rx.recv().await { + // In any websocket error, break loop. + if sender.send(Message::Text(msg)).await.is_err() { + break; + } + } + }); + + // Clone things we want to pass (move) to the receiving task. + let tx = state.tx.clone(); + let name = username.clone(); + + // Spawn a task that takes messages from the websocket, prepends the user + // name, and sends them to all broadcast subscribers. + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(Message::Text(text))) = receiver.next().await { + // Add username before message. + let _ = tx.send(format!("{}: {}", name, text)); + } + }); + + // If any one of the tasks run to completion, we abort the other. + tokio::select! { + _ = (&mut send_task) => recv_task.abort(), + _ = (&mut recv_task) => send_task.abort(), + }; + + // Send "user left" message (similar to "joined" above). + let msg = format!("{} left.", username); + println!("{}", msg); + let _ = state.tx.send(msg); + + // Remove username from map so new clients can take it again. + state.user_set.lock().unwrap().remove(&username); +} + +fn check_username(state: &domain::AppState, string: &mut String, name: &str) { + let mut user_set = state.user_set.lock().unwrap(); + + if !user_set.contains(name) { + user_set.insert(name.to_owned()); + + string.push_str(name); + } +} diff --git a/examples/chat-websocket/src/chat/mod.rs b/examples/chat-websocket/src/chat/mod.rs new file mode 100644 index 0000000..07257dc --- /dev/null +++ b/examples/chat-websocket/src/chat/mod.rs @@ -0,0 +1,2 @@ +pub mod domain; +pub mod handlers; diff --git a/examples/chat-websocket/src/main.rs b/examples/chat-websocket/src/main.rs new file mode 100644 index 0000000..de383da --- /dev/null +++ b/examples/chat-websocket/src/main.rs @@ -0,0 +1,21 @@ +//! Example chat application. +//! + +mod chat; + +use chat::{domain, handlers}; + +use graphul::{http::Methods, FileConfig, Graphul}; + +#[tokio::main] +async fn main() { + let state = domain::app_state(); + + let mut app = Graphul::share_state(state); + + app.static_file("/", "templates/index.html", FileConfig::default()); + + app.get("/websocket", handlers::websocket_handler); + + app.run("127.0.0.1:3000").await; +} diff --git a/examples/chat-websocket/templates/index.html b/examples/chat-websocket/templates/index.html new file mode 100644 index 0000000..7745547 --- /dev/null +++ b/examples/chat-websocket/templates/index.html @@ -0,0 +1,52 @@ + + + + + WebSocket Chat + + +

WebSocket Chat Example With Graphul

+ + + + + + + + + diff --git a/src/app.rs b/src/app.rs index 721f5e6..0948329 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1 +1 @@ -pub const VERSION: &str = "0.4.5"; +pub const VERSION: &str = "0.4.6";