Skip to content

Commit

Permalink
Setup EvtBuzz's IPC dispatchers.
Browse files Browse the repository at this point in the history
  • Loading branch information
Reboot-Codes committed Sep 26, 2024
1 parent 922f2d5 commit 9b6ba9b
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 88 deletions.
2 changes: 2 additions & 0 deletions clover-hub/dev.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CLOVER_LOG="clover::server=debug" CLOVER_MASTER_PRINT="true" cargo run -- run server

189 changes: 141 additions & 48 deletions clover-hub/src/server/evtbuzz/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use warp::{Filter, http::StatusCode};
use crate::utils::iso8601;
use crate::server::evtbuzz::models::{ApiKeyWithKey, ClientWithId, IPCMessageWithId, Session, Store, UserWithId, CoreUserConfig};
use crate::utils::{gen_cid_with_check, gen_ipc_message, gen_message_id_with_check, iso8601};
use crate::server::evtbuzz::models::{ApiKeyWithKey, Client, ClientWithId, CoreUserConfig, IPCMessageWithId, Session, Store, UserWithId};
use crate::server::evtbuzz::websockets::handle_ws_client;

// example error response
Expand Down Expand Up @@ -69,17 +69,7 @@ async fn handle_rejection(err: warp::reject::Rejection) -> std::result::Result<i

// middleware that looks for authorization header and validates it
async fn ensure_authentication(path: String, store: Arc<Arc<Store>>, auth_header: Option<String>) -> Result<(UserWithId, ApiKeyWithKey, ClientWithId, Session), warp::reject::Rejection> {
let client_id = loop {
let client_id = Uuid::new_v4().to_string();
match store.clients.lock().await.get(&client_id.clone()) {
Some(_) => {
debug!("Client: {}, exists, retrying...", client_id.clone());
},
None => {
break client_id;
}
}
};
let client_id = gen_cid_with_check(&store).await;
let mut client = ClientWithId { api_key: "".to_string(), user_id: "".to_string(), id: client_id.clone(), active: true };
store.clients.lock().await.insert(client_id.clone(), client.clone().into());

Expand Down Expand Up @@ -148,22 +138,78 @@ pub struct ServerHealth {
up_since: String
}

async fn handle_ipc_send(sender: &mpsc::UnboundedSender<IPCMessageWithId>, msg: IPCMessageWithId, user_config: &Arc<CoreUserConfig>, store: &Store) {
let users_mutex = &store.users.to_owned();
let users = users_mutex.lock().await;
let user_conf = users.get(&user_config.id.clone()).expect(&format!("ERROR: Core user not found: {}", user_config.id.clone()));
let keys_mutex = &store.api_keys.to_owned();
let keys = keys_mutex.lock().await;
let api_key_conf = keys.get(&user_config.api_key.clone()).expect(&format!("ERROR: Core user api_key not found: {}", user_config.api_key.clone()));
let mut event_sent = false;

for allowed_event_regex in api_key_conf.allowed_events_from.clone() {
match Regex::new(&allowed_event_regex.clone()) {
Ok(regex) => {
if regex.is_match(&msg.kind.clone()) {
match sender.send(msg.clone()) {
Ok(_) => {
event_sent = true;
},
Err(e) => {
error!("Core user: {}, IPC channel: {}, Failed to send message: {{ \"author\": \"{}\", \"kind\": \"{}\", \"message\": \"{}\" }}, due to:\n{}", user_config.id.clone(), user_conf.user_type.clone(), msg.author.clone(), msg.kind.clone(), msg.message.clone(), e);
}
};
}
},
Err(e) => {
error!("Core user: {}, api key's \"allowed events from\", regex: {}, is invalid! Regex Error: {}", user_config.id.clone(), allowed_event_regex.clone(), e);
}
}
}

if !event_sent {
debug!("Core user: {}, event \"{}\" not sent.", user_config.id, msg.kind.clone());
}
}

pub async fn evtbuzz_listener(
port: u16,
ipc_tx: UnboundedSender<IPCMessageWithId>,
mut ipc_rx: UnboundedReceiver<IPCMessageWithId>,
store: Arc<Store>,
mut arbiter_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut renderer_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut modman_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut inference_engine_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut appd_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut arbiter_ipc: (&CoreUserConfig, UnboundedReceiver<IPCMessageWithId>),
mut renderer_ipc: (&CoreUserConfig, UnboundedReceiver<IPCMessageWithId>),
mut modman_ipc: (&CoreUserConfig, UnboundedReceiver<IPCMessageWithId>),
mut inference_engine_ipc: (&CoreUserConfig, UnboundedReceiver<IPCMessageWithId>),
mut appd_ipc: (&CoreUserConfig, UnboundedReceiver<IPCMessageWithId>),
evtbuzz_user_config: Arc<CoreUserConfig>
) {
info!("Starting EvtBuzz on port: {}...", port);

let clients_tx: Arc<Mutex<HashMap<String, UnboundedSender<IPCMessageWithId>>>> = Arc::new(Mutex::new(HashMap::new()));
// TODO: Register core clients.

let arbiter_cfg = arbiter_ipc.0;
let renderer_cfg = renderer_ipc.0;
let modman_cfg = modman_ipc.0;
let inference_engine_cfg = inference_engine_ipc.0;
let appd_cfg = appd_ipc.0;
for client in vec![
arbiter_cfg,
renderer_cfg,
modman_cfg,
inference_engine_cfg,
appd_cfg
] {
let cid = gen_cid_with_check(&store).await;
store.clients.lock().await.insert(
cid,
Client {
api_key: client.api_key.clone(),
user_id: client.id.clone(),
active: true
}
);
}

let (from_client_tx, mut from_client_rx) = mpsc::unbounded_channel::<IPCMessageWithId>();

Expand Down Expand Up @@ -205,15 +251,15 @@ pub async fn evtbuzz_listener(

let server_port = Arc::new(port.clone());
let http_handle = tokio::task::spawn(async move {
info!("Starting server!");
warp::serve(routes)
// TODO: Add option for listening address.
.try_bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), *server_port)).await;
});

let ipc_dispatch_store = Arc::new(store.clone());
let ipc_dispatch_clients_tx = Arc::new(clients_tx.clone());
let ipc_dispatch_user_config = Arc::new(evtbuzz_user_config.clone());
let ipc_dispatch_handle = tokio::task::spawn(async move {
info!("Starting IPC Dispatch!");
while let Some(message) = ipc_rx.recv().await {
debug!("Got message type: {}, with data:\n {}", message.kind.clone(), message.message.clone());
for client in ipc_dispatch_store.clients.lock().await.clone().into_iter() {
Expand Down Expand Up @@ -267,31 +313,20 @@ pub async fn evtbuzz_listener(
None => {
error!("DANGER! Client: {}, had API key removed from store without closing connection on removal, THIS IS BAD; please report this! Closing connection...", client_id.clone());

// TODO: Turn message ID generation loop into a utility function.
let message_id = loop {
let message_id = Uuid::new_v4().to_string();
match ipc_dispatch_store.messages.lock().await.get(&message_id.clone()) {
Some(_) => {
debug!("Message: {}, exists, retrying...", message_id.clone());
},
None => {
break message_id;
}
}
};
// TODO: Create a Util function for core users to send messages with their MasterUserConfig
let generated_message = IPCMessageWithId {
id: message_id.clone(),
author: "hub:server".to_string(),
kind: Url::parse("clover://hub/server/listener/clients/unauthorize")
.unwrap()
.query_pairs_mut()
.append_pair("id", &client_id.clone())
.finish()
.to_string(),
message: "api key removed from store".to_string()
};
ipc_dispatch_store.messages.lock().await.insert(message_id.clone(), generated_message.clone().into());
let kind = Url::parse("clover://hub/server/listener/clients/unauthorize")
.unwrap()
.query_pairs_mut()
.append_pair("id", &client_id.clone())
.finish()
.to_string();

let generated_message = gen_ipc_message(
&ipc_dispatch_store.clone(),
&ipc_dispatch_user_config.clone(),
kind,
"api key removed from store".to_string()
).await;
ipc_dispatch_store.messages.lock().await.insert(generated_message.id.clone(), generated_message.clone().into());

let _ = client_sender.send(generated_message.clone());
}
Expand All @@ -308,7 +343,7 @@ pub async fn evtbuzz_listener(
}
});

//let ipc_receive_store = Arc::new(store.clone());
// IPC Handle for data from WS clients.
let ipc_receive_handle = tokio::task::spawn(async move {
while let Some(msg) = from_client_rx.recv().await {
debug!("Got message: {{ \"author\": \"{}\", \"kind\": \"{}\", \"message\": \"{}\" }}", msg.author.clone(), msg.kind.clone(), msg.message.clone());
Expand All @@ -321,5 +356,63 @@ pub async fn evtbuzz_listener(
}
});

futures::future::join_all(vec![http_handle, ipc_dispatch_handle, ipc_receive_handle]).await;
// Internal IPC Handles
let from_arbiter_cfg = Arc::new(arbiter_cfg.clone());
let from_arbiter_store = Arc::new(store.clone());
let from_arbiter_tx = Arc::new(from_client_tx.clone());
let from_arbiter_handle = tokio::task::spawn(async move {
while let Some(msg) = arbiter_ipc.1.recv().await {
handle_ipc_send(&from_arbiter_tx, msg, &from_arbiter_cfg.clone(), &from_arbiter_store.clone()).await;
}
});

let from_renderer_cfg = Arc::new(renderer_cfg.clone());
let from_renderer_store = Arc::new(store.clone());
let from_renderer_tx = Arc::new(from_client_tx.clone());
let from_renderer_handle = tokio::task::spawn(async move {
while let Some(msg) = renderer_ipc.1.recv().await {
handle_ipc_send(&from_renderer_tx, msg, &from_renderer_cfg.clone(), &from_renderer_store.clone()).await;
}
});

let from_modman_cfg = Arc::new(modman_cfg.clone());
let from_modman_store = Arc::new(store.clone());
let from_modman_tx = Arc::new(from_client_tx.clone());
let from_modman_handle = tokio::task::spawn(async move {
while let Some(msg) = modman_ipc.1.recv().await {
handle_ipc_send(&from_modman_tx, msg, &from_modman_cfg.clone(), &from_modman_store.clone()).await;
}
});

let from_inference_engine_cfg = Arc::new(inference_engine_cfg.clone());
let from_inference_engine_store = Arc::new(store.clone());
let from_inference_engine_tx = Arc::new(from_client_tx.clone());
let from_inference_engine_handle = tokio::task::spawn(async move {
while let Some(msg) = inference_engine_ipc.1.recv().await {
handle_ipc_send(&from_inference_engine_tx, msg, &from_inference_engine_cfg.clone(), &from_inference_engine_store.clone()).await;
}
});

let from_appd_cfg = Arc::new(appd_cfg.clone());
let from_appd_store = Arc::new(store.clone());
let from_appd_tx = Arc::new(from_client_tx.clone());
let from_appd_handle = tokio::task::spawn(async move {
while let Some(msg) = appd_ipc.1.recv().await {
handle_ipc_send(&from_appd_tx, msg, &from_appd_cfg.clone(), &from_appd_store.clone()).await;
}
});

futures::future::join_all(vec![
http_handle,
ipc_dispatch_handle,
ipc_receive_handle,
from_arbiter_handle,
from_renderer_handle,
from_modman_handle,
from_inference_engine_handle,
from_appd_handle
]).await;

info!("Shutting down EvtBuzz...");
// TODO: Clean up registered sessions when server is shutting down.
}
20 changes: 13 additions & 7 deletions clover-hub/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn handle_ipc_send(sender: &mpsc::UnboundedSender<IPCMessageWithId>, msg:
}

pub async fn server_main(port: u16) {
info!("Running Backend Server Threads...");
info!("Starting CloverHub...");

let (
store,
Expand Down Expand Up @@ -124,21 +124,27 @@ pub async fn server_main(port: u16) {
let evtbuzz_port = Arc::new(port);
let evtbuzz_store = Arc::new(store.clone());
let evtbuzz_uca = Arc::new(evtbuzz_user_config.clone());
let evtbuzz_handler = tokio::task::spawn(async move {
let evtbuzz_arbiter_user_config_arc = Arc::new(arbiter_user_config.clone());
let evtbuzz_renderer_user_config_arc = Arc::new(renderer_user_config.clone());
let evtbuzz_modman_user_config_arc = Arc::new(modman_user_config.clone());
let evtbuzz_inference_engine_user_config_arc = Arc::new(inference_engine_user_config.clone());
let evtbuzz_appd_user_config_arc = Arc::new(appd_user_config.clone());
let evtbuzz_handler = tokio::task::spawn(async move {
evtbuzz_listener(
*evtbuzz_port.to_owned(),
evtbuzz_from_tx,
evtbuzz_to_rx,
evtbuzz_store.clone(),
arbiter_from_rx,
renderer_from_rx,
modman_from_rx,
inference_engine_from_rx,
appd_from_rx,
(&evtbuzz_arbiter_user_config_arc.clone(), arbiter_from_rx),
(&evtbuzz_renderer_user_config_arc.clone(), renderer_from_rx),
(&evtbuzz_modman_user_config_arc.clone(), modman_from_rx),
(&evtbuzz_inference_engine_user_config_arc.clone(), inference_engine_from_rx),
(&evtbuzz_appd_user_config_arc.clone(), appd_from_rx),
evtbuzz_uca.clone()
).await;
});

// Get messages from EvtBuzz (incl ones from the other threads), and pass them around. Yes, this does include looping events back into EvtBuzz.
let ipc_listener_dispatch_store = Arc::new(store.clone());
let ipc_from_listener_dispatch = tokio::task::spawn(async move {
while let Some(msg) = evtbuzz_from_rx.recv().await {
Expand Down
Loading

0 comments on commit 9b6ba9b

Please sign in to comment.