Skip to content

Commit

Permalink
refac
Browse files Browse the repository at this point in the history
  • Loading branch information
splurf committed Jun 8, 2024
1 parent 386d659 commit f9088db
Show file tree
Hide file tree
Showing 5 changed files with 509 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/base/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::time::Duration;

/// The initial HTTP response headers appended with the `ESC[2J` erase function
pub const INIT: &[u8] =
b"HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n\x1b[2J";

/// The delay between each frame
/// - 20.833333ms => ~48 FPS
pub const DELAY: Duration = Duration::from_nanos(20833333);

/// The characters required to make each donut frame
pub const CHARACTERS: [u8; 12] = [46, 44, 45, 126, 58, 59, 61, 33, 42, 35, 36, 64];
120 changes: 120 additions & 0 deletions src/base/err.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
pub type Result<T, E = Error> = std::result::Result<T, E>;

pub enum UriError {
HttpParse(httparse::Error),
Method(String),
Path(String),
Version(u8),
Header(String),
}

impl<'a> From<&mut httparse::Header<'a>> for UriError {
fn from(value: &mut httparse::Header<'a>) -> Self {
Self::Header(format!(
"{{ {}: {} }}",
value.name,
String::from_utf8_lossy(value.value)
))
}
}

impl std::fmt::Display for UriError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&match self {
Self::HttpParse(e) => e.to_string(),
Self::Method(s) => format!("method {}", s),
Self::Path(s) => format!("path {}", s),
Self::Version(v) => format!("version {}", v),
Self::Header(s) => format!("header {}", s),
})
}
}

pub enum Invalid {
Uri(UriError),
Format,
}

impl std::fmt::Display for Invalid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"Invalid {}",
match self {
Self::Uri(e) => e.to_string(),
Self::Format => "http format".to_string(),
}
))
}
}

impl From<httparse::Error> for Invalid {
fn from(value: httparse::Error) -> Self {
UriError::HttpParse(value).into()
}
}

impl From<UriError> for Invalid {
fn from(value: UriError) -> Self {
Self::Uri(value)
}
}

pub enum Error {
IO(std::io::Error),
Parse(Invalid),
Sync,
}

impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Self::IO(value)
}
}

impl<T: Into<Invalid>> From<T> for Error {
fn from(value: T) -> Self {
Self::Parse(value.into())
}
}

impl<T> From<std::sync::PoisonError<std::sync::RwLockReadGuard<'_, T>>> for Error {
fn from(_: std::sync::PoisonError<std::sync::RwLockReadGuard<'_, T>>) -> Self {
Self::Sync
}
}

impl<T> From<std::sync::PoisonError<std::sync::RwLockWriteGuard<'_, T>>> for Error {
fn from(_: std::sync::PoisonError<std::sync::RwLockWriteGuard<'_, T>>) -> Self {
Self::Sync
}
}

impl<T> From<std::sync::PoisonError<std::sync::MutexGuard<'_, T>>> for Error {
fn from(_: std::sync::PoisonError<std::sync::MutexGuard<'_, T>>) -> Self {
Self::Sync
}
}

impl From<Box<dyn std::any::Any + Send>> for Error {
fn from(_: Box<dyn std::any::Any + Send>) -> Self {
Self::Sync
}
}

impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self, f)
}
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&match self {
Self::IO(e) => e.to_string(),
Self::Parse(e) => e.to_string(),
Self::Sync => "An unexpected poison error has occurred".to_string(),
})
}
}

impl std::error::Error for Error {}
153 changes: 153 additions & 0 deletions src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
mod cfg;
mod consts;
mod err;
mod sync;
mod utils;

use consts::*;
use sync::*;
use utils::*;

pub use cfg::*;
pub use err::*;

use std::{
collections::HashMap,
io::Write,
net::{SocketAddr, TcpListener, TcpStream},
thread::{sleep, JoinHandle},
};

/// Automatically remove any disconnected clients.
fn error_handler(
streams: CondLock<HashMap<SocketAddr, TcpStream>>,
disconnected: CondLock<Vec<SocketAddr>>,
) -> JoinHandle<Result<()>> {
init_handler(move || {
// wait for a connection to be lost
disconnected.wait()?;

{
// acquire and hold onto the write guards
let mut gw_disconnected = disconnected.write()?;
let mut gw_streams = streams.write()?;

// remove every disconnected stream
while let Some(ip) = gw_disconnected.pop() {
gw_streams.remove(&ip);
}
}
// update the predicate of `streams` so the main thread
// knows when to pause due to there being no connections
let is_empty = streams.read()?.is_empty();
*streams.lock()? = !is_empty;

// reset the boolean predicate
*disconnected.lock()? = false;
Ok(())
})
}

/// Validate and instantiate streams into the system.
fn incoming_handler(
server: TcpListener,
streams: CondLock<HashMap<SocketAddr, TcpStream>>,
path: &str,
) -> JoinHandle<Result<()>> {
let path = path.to_owned();

init_handler(move || {
// handle any potential stream waiting to be accepted by the server
let (mut stream, addr) = server.accept()?;

// determine the authenticity of the stream
verify_stream(&stream, &path)?;

// setup the client's terminal
stream.write_all(INIT)?;

// add the stream to the map
streams.write()?.insert(addr, stream);

// notify `streams` of a new connection
*streams.lock()? = true;
streams.notify();
Ok(())
})
}

/// Distribute each frame to every stream.
fn dist_handler(
streams: &CondLock<HashMap<SocketAddr, TcpStream>>,
disconnected: &CondLock<Vec<SocketAddr>>,
frames: &[Vec<u8>],
) -> Result<()> {
// wait if and only if there are no connections
streams.wait()?;

// distribute the frames to each client
for frame in frames {
// discontinue distributing frames and pause
// this thread if there are no connections
if !*streams.lock()? {
break;
}

// the number of disconnections
let res = {
// acquire the writing guard of `disconnected`.
// This doesn't cause a deadlock because `disconnected` is
// only ever externally accessed after the end of this scope,
// which is covered as this guard gets automatically dropped
let mut g = disconnected.write()?;

// send each stream the current frame
for (ip, mut stream) in streams.read()?.iter() {
// remove the client if they have disconnected
if stream.write_all(frame).is_err() {
g.push(*ip);
}
}
// determinant for whether there have been any disconnections
g.len() > 0
};

// notify `disconnected` due to a disconnection
if res {
*disconnected.lock()? = true;
disconnected.notify();
}

// the pause between each frame
sleep(DELAY)
}
Ok(())
}

/// Inititate program.
pub fn init() -> Result<()> {
// parse program arguments
let cfg = Config::new();

// init listener
let server = TcpListener::bind(cfg.addr())?;

// connected clients
let streams = CondLock::default();

// disconnected clients
let disconnected = CondLock::default();

// init handlers
error_handler(streams.clone(), disconnected.clone());
incoming_handler(server, streams.clone(), cfg.path());

// obtain frames
let frames = donuts();

println!("Listening @ http://{}{}\n", cfg.addr(), cfg.path());

// Distribute frames to each client as long as there is at least one connection.
// Otherwise, the thread remains paused.
loop_func(move || dist_handler(&streams, &disconnected, &frames))
}
43 changes: 43 additions & 0 deletions src/base/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};

use super::Result;

/// A shared state, conveniently bundled with its respective condition variable
#[derive(Default)]
pub struct CondLock<T>(Arc<(RwLock<T>, Mutex<bool>, Condvar)>);

impl<T> CondLock<T> {
/// Acquires the mutex for the boolean predicate
pub fn lock(&self) -> Result<MutexGuard<bool>> {
self.0 .1.lock().map_err(Into::into)
}

/// Blocks the current thread until the condition variable receives a notification
pub fn wait(&self) -> Result<()> {
let cvar = &self.0 .2;
let mut started = self.lock()?;
while !*started {
started = cvar.wait(started)?;
}
Ok(())
}

/// Wakes up one blocked thread on the condition variable
pub fn notify(&self) {
self.0 .2.notify_one()
}
}

impl<T> Clone for CondLock<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T> std::ops::Deref for CondLock<T> {
type Target = RwLock<T>;

fn deref(&self) -> &Self::Target {
&self.0 .0
}
}
Loading

0 comments on commit f9088db

Please sign in to comment.