Skip to content

Commit

Permalink
refac & redoc
Browse files Browse the repository at this point in the history
  • Loading branch information
splurf committed Feb 29, 2024
1 parent 4ff34b9 commit e35af30
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 145 deletions.
211 changes: 89 additions & 122 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,176 +7,143 @@ mod util;
use {
cfg::*,
clap::Parser,
consts::DELAY,
consts::{DELAY, INIT},
err::*,
std::{
collections::HashMap,
io::Write,
net::{TcpListener, TcpStream},
thread::{sleep, spawn, JoinHandle},
net::{SocketAddr, TcpListener, TcpStream},
thread::{sleep, JoinHandle},
},
sync::*,
util::*,
};

/** Spawn a new thread that repeatedly calls the provided function until the break predicate is modified */
fn init_handler(
mut f: impl FnMut(&mut bool) -> Result<()> + Send + 'static,
) -> JoinHandle<Result<()>> {
spawn(move || -> Result<()> {
let mut stop = false; // the "break" determinant

loop {
// Call the function
if let Err(e) = f(&mut stop) {
eprintln!("Error: {}", e)
}
// End loop if the determinant is true
if stop {
break Ok(());
}
}
})
}

/** Validate and instantiate streams into the system */
fn incoming_handler(
server: TcpListener,
streams: CondLock<HashMap<u16, TcpStream>>,
path: String,
) -> JoinHandle<Result<()>> {
init_handler(move |_| {
// Handle any potential stream waiting to be accepted by the server
let (stream, addr) = server.accept()?;
// Validate then setup the stream
handle_stream(stream, addr.port(), streams.write()?, &path)?;
// Notify `streams` of a new connection
*streams.lock()? = true;
streams.notify();
Ok(())
})
}

/** Automatically remove any disconnected clients */
/// Automatically remove any disconnected clients.
fn error_handler(
streams: CondLock<HashMap<u16, TcpStream>>,
disconnected: CondLock<Vec<u16>>,
streams: CondLock<HashMap<SocketAddr, TcpStream>>,
disconnected: CondLock<Vec<SocketAddr>>,
) -> JoinHandle<Result<()>> {
init_handler(move |_| {
// Wait for a connection to be lost
init_handler(move || {
// wait for a connection to be lost
disconnected.wait()?;

{
// Acquire and hold onto the write guards
// acquire and hold onto the write guards
let mut gw_disconnected = disconnected.write()?;
let mut gw_streams = streams.write()?;

// Remove every disconnected stream
// remove every disconnected stream
while let Some(ip) = gw_disconnected.pop() {
gw_streams.remove(&ip);
}
}
/*
* Update the boolean predicate of `streams` so the main thread
* knows when to pause due to there being no connections
*/
// update the predicate of `streams` so the main thread
// knows when to pause due to there being no connections
let is_empty = streams.read()?.is_empty();
*streams.lock()? = !is_empty;

// Reset the boolean predicate
// reset the boolean predicate
*disconnected.lock()? = false;
Ok(())
})
}

/// Validate and instantiate streams into the system.
fn incoming_handler(
server: TcpListener,
streams: CondLock<HashMap<SocketAddr, TcpStream>>,
path: String,
) -> JoinHandle<Result<()>> {
init_handler(move || {
// handle any potential stream waiting to be accepted by the server
let (mut stream, addr) = server.accept()?;

// determine the authenticity of the stream
verify_stream(&stream, &path)?;

// setup the client's terminal
stream.write_all(INIT)?;

// add the stream to the map
streams.write()?.insert(addr, stream);

// notify `streams` of a new connection
*streams.lock()? = true;
streams.notify();
Ok(())
})
}

fn main() -> Result<()> {
// Handle any arguments
// handle any program arguments
let cfg = Config::parse();

// Instantiate the listener
// instantiate the listener
let server = TcpListener::bind(cfg.addr())?;

// Currently connected clients
let streams = CondLock::<HashMap<u16, TcpStream>>::default();
// currently connected clients
let streams = CondLock::<HashMap<SocketAddr, TcpStream>>::default();

// Clients that have disconnected
let disconnected = CondLock::<Vec<u16>>::default();
// clients that have disconnected
let disconnected = CondLock::<Vec<SocketAddr>>::default();

// Instantiate handlers (external threads)
let th1 = incoming_handler(server, streams.clone(), cfg.path().to_string());
let th2 = error_handler(streams.clone(), disconnected.clone());
// init handlers
_ = error_handler(streams.clone(), disconnected.clone());
_ = incoming_handler(server, streams.clone(), cfg.path().to_string());

// Original frames
// generate the original frames
let mut frames = donuts(); // 559234 bytes

// Trim the majority of the unnecessary whitespace
// trim redundant whitespace
trim_frames(&mut frames); // 395012 bytes (~29% smaller)

println!("Listening @ http://{}{}\n", cfg.addr(), cfg.path());

/*
* This is essentially the main thread
*
* As long as there is at least one connection,
* distribute the frames to each client, otherwise,
* put the thread to sleep.
*/
init_handler(move |stop| {
/*
* If either of these threads have finished,
* end the main thread because an
* unexpected error has occurred
*/
if th1.is_finished() || th2.is_finished() {
*stop = true;
Ok(())
} else {
// Wait if and only if there are no connections
streams.wait()?;

// Distribute the frames to each client
for frame in frames.iter() {
/*
* Discontinue distributing frames and pause
* this thread if there are no connections
*/
if !*streams.lock()? {
break;
}
// as long as there is at least one connection,
// distribute the frames to each client, otherwise,
// the thread remains paused.
init_handler(move || {
// wait if and only if there are no connections
streams.wait()?;

// distribute the frames to each client
for frame in frames.iter() {
// discontinue distributing frames and pause
// this thread if there are no connections
if !*streams.lock()? {
break;
}

let res = {
/*
* Acquire the writing guard of `disconnected`
* This doesn't cause a deadlock because `disconnected` is
* only ever externally accessed after the end of this scope,
* which is covered as this guard gets automatically dropped
*/
let mut g = disconnected.write()?;

// Send each stream the current frame
for (ip, mut stream) in streams.read()?.iter() {
// Completely remove the client if they have disconnected
if stream.write_all(frame).is_err() {
g.push(*ip);
}
// the number of disconnections
let res = {
// acquire the writing guard of `disconnected`.
// This doesn't cause a deadlock because `disconnected` is
// only ever externally accessed after the end of this scope,
// which is covered as this guard gets automatically dropped
let mut g = disconnected.write()?;

// send each stream the current frame
for (ip, mut stream) in streams.read()?.iter() {
// remove the client if they have disconnected
if stream.write_all(frame).is_err() {
g.push(*ip);
}
// Determinant for whether there have been any disconnections
g.len() > 0
};
/*
* This only happens if there was at least one disconnection.
*
* Have `th2` handle the disconnections, while
* this thread sleeps for the normal duration
*/
if res {
*disconnected.lock()? = true;
disconnected.notify();
}
sleep(DELAY)
// determinant for whether there have been any disconnections
g.len() > 0
};

// notify `disconnected` due to a disconnection
if res {
*disconnected.lock()? = true;
disconnected.notify();
}
Ok(())

// the pause between each frame
sleep(DELAY)
}
Ok(())
})
.join()?
}
37 changes: 14 additions & 23 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use {
crate::{
consts::{CHARACTERS, INIT},
err::*,
},
crate::{consts::CHARACTERS, err::*},
httparse::{Request, EMPTY_HEADER},
std::{
collections::HashMap,
f32::consts::TAU,
io::{Read, Write},
io::Read,
net::TcpStream,
sync::RwLockWriteGuard,
thread::{spawn, JoinHandle},
},
uriparse::Path,
};
Expand Down Expand Up @@ -131,7 +127,7 @@ pub fn trim_frames(frames: &mut [Vec<u8>; 314]) {
}

/// Verify the potential client by checking if the User-Agent's product is `curl` and a few other practicalities
fn verify_stream(mut stream: &TcpStream, uri_path: &str) -> Result<()> {
pub fn verify_stream(mut stream: &TcpStream, uri_path: &str) -> Result<()> {
// Read from the incoming stream
let mut buf = [0; 128];
let bytes = stream.read(&mut buf)?;
Expand Down Expand Up @@ -171,19 +167,14 @@ fn verify_stream(mut stream: &TcpStream, uri_path: &str) -> Result<()> {
}
}

/// Handler for every potential client
pub fn handle_stream(
mut stream: TcpStream,
port: u16,
mut streams: RwLockWriteGuard<HashMap<u16, TcpStream>>,
path: &str,
) -> Result<()> {
// Determine the authenticity of the stream
verify_stream(&stream, path)?;

// Setup the client's terminal
stream.write_all(INIT)?;
// Add the stream to the map
streams.insert(port, stream);
Ok(())
/// Spawn a new thread that repeatedly calls the provided function.
pub fn init_handler(mut f: impl FnMut() -> Result<()> + Send + 'static) -> JoinHandle<Result<()>> {
spawn(move || -> Result<()> {
loop {
// call the function
if let Err(e) = f() {
eprintln!("Error: {}", e)
}
}
})
}

0 comments on commit e35af30

Please sign in to comment.