From ab48146e3e0b6e0c90b089db208640b79928eaf1 Mon Sep 17 00:00:00 2001 From: Ruthger Dijt Date: Mon, 10 Jan 2022 17:48:41 +0100 Subject: [PATCH] backend/http: add "intelligent" graph locking behavior --- .../crates/eiffelvis_core/src/domain/app.rs | 9 ++-- .../src/graph_storage/chunked_storage.rs | 15 ++++-- backend/crates/eiffelvis_http/src/handlers.rs | 53 +++++++++++++++---- backend/crates/eiffelvis_http/src/lib.rs | 12 ++++- backend/src/main.rs | 25 ++++++--- 5 files changed, 83 insertions(+), 31 deletions(-) diff --git a/backend/crates/eiffelvis_core/src/domain/app.rs b/backend/crates/eiffelvis_core/src/domain/app.rs index 8458534b..76ce8825 100644 --- a/backend/crates/eiffelvis_core/src/domain/app.rs +++ b/backend/crates/eiffelvis_core/src/domain/app.rs @@ -16,7 +16,7 @@ impl Key for Uuid {} pub trait EiffelVisApp: EiffelGraph { /// Inserts a new eiffel event into storage - fn push(&mut self, event: BaseEvent); + fn push(&mut self, event: BaseEvent) -> bool; /// Looks up the event of given id fn get_event(&self, id: Uuid) -> Option<&BaseEvent>; @@ -29,15 +29,14 @@ pub trait EiffelVisApp: EiffelGraph { } impl EiffelVisApp for G { - fn push(&mut self, event: BaseEvent) { + fn push(&mut self, event: BaseEvent) -> bool { let links = event.links.clone(); self.add_node_with_edges( event.meta.id, event, links.into_iter().map(|link| (link.target, link.link_type)), - ); - - println!("Graph size: {}", self.node_count()); + ) + .is_some() } fn get_event(&self, id: Uuid) -> Option<&BaseEvent> { diff --git a/backend/crates/eiffelvis_core/src/graph_storage/chunked_storage.rs b/backend/crates/eiffelvis_core/src/graph_storage/chunked_storage.rs index 3a8e046d..5d296467 100644 --- a/backend/crates/eiffelvis_core/src/graph_storage/chunked_storage.rs +++ b/backend/crates/eiffelvis_core/src/graph_storage/chunked_storage.rs @@ -1,4 +1,4 @@ -use crate::graph; +use crate::graph::{self, Indexable}; use ahash::RandomState; use indexmap::IndexMap; use std::{ @@ -98,7 +98,12 @@ impl ChunkedGraph { } } - fn add_node(&mut self, key: K, data: N) -> ChunkedIndex { + fn add_node(&mut self, key: K, data: N) -> Option { + // TODO: decide if we want to be correct or not :/ + if self.get(key).is_some() { + return None; + } + if self.store[self.head_chunk()].len() >= self.max_elements() { self.newest_generation += 1; if self.chunks() < self.max_chunks() { @@ -116,10 +121,10 @@ impl ChunkedGraph { self.store[head_chunk].insert(key, Element(NodeData { data }, Vec::default())); - ChunkedIndex::new( + Some(ChunkedIndex::new( self.newest_generation, (self.store[head_chunk].len() - 1) as u32, - ) + )) } fn add_edge(&mut self, a: K, b: K, data: E) { @@ -321,7 +326,7 @@ impl<'a, K: graph::Key, N, E> graph::ItemIter for ChunkedGraph { impl<'a, K: graph::Key, N, E> graph::Graph for ChunkedGraph { fn add_node(&mut self, key: K, data: N) -> Option { - Some(self.add_node(key, data)) + self.add_node(key, data) } fn add_edge(&mut self, a: K, b: K, data: E) { diff --git a/backend/crates/eiffelvis_http/src/handlers.rs b/backend/crates/eiffelvis_http/src/handlers.rs index 9a0f1756..74077432 100644 --- a/backend/crates/eiffelvis_http/src/handlers.rs +++ b/backend/crates/eiffelvis_http/src/handlers.rs @@ -1,3 +1,5 @@ +use std::{collections::VecDeque, ops::Add}; + use crate::*; use serde::Serialize; @@ -32,7 +34,7 @@ pub(crate) fn make_service(app: App) -> Router { pub async fn event_dump( Extension(app): Extension>, ) -> impl IntoResponse { - let lk = app.read().await; + let lk = app.graph.read().await; let dump = lk.dump::<&BaseEvent>(); @@ -44,7 +46,7 @@ pub async fn get_event( Path(find_id): Path, Extension(app): Extension>, ) -> impl IntoResponse { - let lk = app.read().await; + let lk = app.graph.read().await; if let Some(event) = lk.get_event(find_id) { Json(event).into_response() } else { @@ -61,7 +63,7 @@ pub async fn events_with_root( Path(find_id): Path, Extension(app): Extension>, ) -> impl IntoResponse { - let lk = app.read().await; + let lk = app.graph.read().await; Json(lk.get_subgraph_with_roots::<&BaseEvent>(&[find_id])).into_response() } @@ -86,6 +88,15 @@ pub async fn establish_websocket( ws.on_upgrade(move |mut socket| async move { let mut interval = tokio::time::interval(std::time::Duration::from_millis(500)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let hist_max = 8; + let mut delta_hist = VecDeque::with_capacity(hist_max); + let mut last_heurstic = 0; + let mut heuristic_changed = false; + + delta_hist.push_back(0); + let mut req_handler: Option> = None; while let Ok(()) = tokio::select! { @@ -101,22 +112,42 @@ pub async fn establish_websocket( }; let res = QueryRes { repr: msg.clone(), error: res }; println!("Request {:?}", res); + heuristic_changed = true; socket.send(Message::Text(serde_json::to_string(&res).unwrap())).await.map_err(|_| ()) }, _ => Err(()) } }, _ = interval.tick() => { - if let Some(handler) = req_handler.as_mut() { - let events: Vec = handler.handle(&*app.read().await); - if !events.is_empty() { - socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ()) - } else { - Ok(()) + let heuristic = app.heuristic.load(std::sync::atomic::Ordering::Relaxed); + let average = delta_hist.iter().fold(0, Add::add) / delta_hist.len() as u64; + let delta = heuristic - last_heurstic; + + if last_heurstic != heuristic { + // the delta might be too high for us to lock the graph so we need to remember + heuristic_changed = true; + } + + let mut res = Ok(()); + if heuristic_changed && delta <= average { + if let Some(handler) = req_handler.as_mut() { + info!("locking graph!"); + let events: Vec = handler.handle(&*app.graph.read().await); + if !events.is_empty() { + res = socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ()) + } } - } else { - Ok(()) + + heuristic_changed = false; } + + if delta_hist.len() >= hist_max { + delta_hist.pop_front(); + } + delta_hist.push_back(delta); + last_heurstic = heuristic; + + res } } {} diff --git a/backend/crates/eiffelvis_http/src/lib.rs b/backend/crates/eiffelvis_http/src/lib.rs index 270779dc..1dfc9afb 100644 --- a/backend/crates/eiffelvis_http/src/lib.rs +++ b/backend/crates/eiffelvis_http/src/lib.rs @@ -14,7 +14,7 @@ use tracing::info; use std::{ io, net::{IpAddr, SocketAddr}, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, }; use tower_http::cors::{any, CorsLayer}; @@ -26,7 +26,15 @@ use eiffelvis_core::domain::{app::EiffelVisApp, types::BaseEvent}; pub trait EiffelVisHttpApp: EiffelVisApp + Send + Sync + 'static {} impl EiffelVisHttpApp for T where T: EiffelVisApp + Send + Sync + 'static {} -type App = Arc>; +pub struct AppData { + /// Graph to serve to the web + pub graph: tokio::sync::RwLock, + /// Heuristic used to determine when locking of graph should occur, + /// library consumers should increment this counter on a succesful push to the graph. + pub heuristic: AtomicU64, +} + +type App = Arc>; /// Takes an eiffelvis app and binds the http server on the given address. /// This is likely the only function you'll ever need to call. diff --git a/backend/src/main.rs b/backend/src/main.rs index a8bafd68..7babda72 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -7,8 +7,9 @@ use std::{sync::Arc, time::Duration}; use eiffelvis_core::{domain::app::EiffelVisApp, graph_storage::ChunkedGraph}; +use eiffelvis_http::AppData; use structopt::StructOpt; -use tracing::info; +use tracing::{info, warn}; /// Command line options #[derive(StructOpt, Debug)] @@ -61,10 +62,10 @@ async fn main() { let cli = Cli::from_args(); - let graph = Arc::new(tokio::sync::RwLock::new(ChunkedGraph::new( - cli.max_chunks, - cli.chunk_size, - ))); + let graph = Arc::new(AppData { + heuristic: 0.into(), + graph: tokio::sync::RwLock::new(ChunkedGraph::new(cli.max_chunks, cli.chunk_size)), + }); let http_server_handle = eiffelvis_http::Handle::new(); let http_server = tokio::spawn(eiffelvis_http::app( @@ -88,12 +89,20 @@ async fn main() { loop { if let Some(bytes) = event_parser.next().await { if let Ok(des) = serde_json::from_slice(&bytes) { - EiffelVisApp::push(&mut *graph.write().await, des); + let mut lk = graph.graph.write().await; + if EiffelVisApp::push(&mut *lk, des) { + graph + .heuristic + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + info!("size: {}", lk.node_count()); + } else { + warn!("Failed to push graph! maybe a duplicate event?") + } } else { - info!("Received new message but failed to deserialize"); + warn!("Received new message but failed to deserialize"); } } else { - info!("Event stream failed, sleeping for 5 seconds to retry"); + warn!("Event stream failed, sleeping for 5 seconds to retry"); tokio::time::sleep(Duration::from_secs(timeout)).await; } }