Skip to content

Commit

Permalink
Complete IPC between core threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
Reboot-Codes committed Sep 26, 2024
1 parent 276dfb4 commit 922f2d5
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 170 deletions.
111 changes: 13 additions & 98 deletions clover-hub/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 26 additions & 13 deletions clover-hub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,39 @@ version = "1.0.0"
edition = "2021"

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
tokio = { version = "1.28", features = ["macros", "sync", "rt-multi-thread"] }
tokio-stream = "0.1.14"
warp = "0.3"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = { version = "0.3", default-features = false }
uuid = { version = "1.8.0", features = ["serde", "v4"] }
# CLI
crossterm = { version = "0.19", features = [ "serde" ] }
chrono = { version = "0.4", features = ["serde"] }
rand = { version = "0.7.3", default-features = false, features = ["std"] }
tui = { version = "0.14", default-features = false, features = ['crossterm', 'serde'] }
thiserror = "1.0"
clap = { version = "4.5.4", features = ["derive"] }

# Debugging Tools
log = "0.4.8"
env_logger = "0.11.3"
fast_websocket_client = "0.2.0"
eventbus = "0.5.1"

# Utils that we can't be bothered to write.
chrono = { version = "0.4", features = ["serde"] }
regex = "1.10.4"
uuid = { version = "1.8.0", features = ["serde", "v4"] }
thiserror = "1.0"

# HTTP/WS
api_key = "0.1.0"
url = "2.5.0"
fast_websocket_client = "0.2.0"
warp = "0.3"

# Graphics
raylib = "5.0.2"
egl = "0.2.7"

# Adding stuff to rust that shoulda been there in the first place.
async-recursion = "1.1.1"
tokio = { version = "1.28", features = ["macros", "sync", "rt-multi-thread"] }
tokio-stream = "0.1.14"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = { version = "0.3", default-features = false }

# Crypto
rand = "0.8.5"

6 changes: 4 additions & 2 deletions clover-hub/src/server/appd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::sync::Arc;
use log::info;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use super::evtbuzz::models::{IPCMessageWithId, Store};
use super::evtbuzz::models::{IPCMessageWithId, CoreUserConfig, Store};

pub async fn appd_main(ipc_tx: UnboundedSender<IPCMessageWithId>, ipc_rx: UnboundedReceiver<IPCMessageWithId>, store: Arc<Store>) {
pub async fn appd_main(ipc_tx: UnboundedSender<IPCMessageWithId>, ipc_rx: UnboundedReceiver<IPCMessageWithId>, store: Arc<Store>, user_config: Arc<CoreUserConfig>) {
info!("Starting AppDaemon...");
// TODO: Add docker crate to manage applications.
}
7 changes: 4 additions & 3 deletions clover-hub/src/server/arbiter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;
use log::info;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use super::evtbuzz::models::{IPCMessageWithId, Store};

pub async fn arbiter_main(ipc_tx: UnboundedSender<IPCMessageWithId>, ipc_rx: UnboundedReceiver<IPCMessageWithId>, store: Arc<Store>) {
use super::evtbuzz::models::{IPCMessageWithId, CoreUserConfig, Store};

pub async fn arbiter_main(ipc_tx: UnboundedSender<IPCMessageWithId>, ipc_rx: UnboundedReceiver<IPCMessageWithId>, store: Arc<Store>, user_config: Arc<CoreUserConfig>) {
info!("Starting Arbiter...");
}
21 changes: 18 additions & 3 deletions clover-hub/src/server/evtbuzz/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use warp::{Filter, http::StatusCode};
use crate::utils::iso8601;
use crate::server::evtbuzz::models::{ApiKeyWithKey, ClientWithId, IPCMessageWithId, Session, Store, UserWithId};
use crate::server::evtbuzz::models::{ApiKeyWithKey, ClientWithId, IPCMessageWithId, Session, Store, UserWithId, CoreUserConfig};
use crate::server::evtbuzz::websockets::handle_ws_client;

// example error response
Expand Down Expand Up @@ -148,10 +148,23 @@ pub struct ServerHealth {
up_since: String
}

pub async fn evtbuzz_listener(port: u16, ipc_tx: UnboundedSender<IPCMessageWithId>, mut ipc_rx: UnboundedReceiver<IPCMessageWithId>, store: Arc<Store>) {
info!("Starting HTTP and WebSocket Server on port: {}...", port);
pub async fn evtbuzz_listener(
port: u16,
ipc_tx: UnboundedSender<IPCMessageWithId>,
mut ipc_rx: UnboundedReceiver<IPCMessageWithId>,
store: Arc<Store>,
mut arbiter_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut renderer_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut modman_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut inference_engine_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
mut appd_ipc_rx: UnboundedReceiver<IPCMessageWithId>,
evtbuzz_user_config: Arc<CoreUserConfig>
) {
info!("Starting EvtBuzz on port: {}...", port);

let clients_tx: Arc<Mutex<HashMap<String, UnboundedSender<IPCMessageWithId>>>> = Arc::new(Mutex::new(HashMap::new()));
// TODO: Register core clients.

let (from_client_tx, mut from_client_rx) = mpsc::unbounded_channel::<IPCMessageWithId>();

let filter_to_clients_tx = Arc::new(clients_tx.clone());
Expand Down Expand Up @@ -254,6 +267,7 @@ pub async fn evtbuzz_listener(port: u16, ipc_tx: UnboundedSender<IPCMessageWithI
None => {
error!("DANGER! Client: {}, had API key removed from store without closing connection on removal, THIS IS BAD; please report this! Closing connection...", client_id.clone());

// TODO: Turn message ID generation loop into a utility function.
let message_id = loop {
let message_id = Uuid::new_v4().to_string();
match ipc_dispatch_store.messages.lock().await.get(&message_id.clone()) {
Expand All @@ -265,6 +279,7 @@ pub async fn evtbuzz_listener(port: u16, ipc_tx: UnboundedSender<IPCMessageWithI
}
}
};
// TODO: Create a Util function for core users to send messages with their MasterUserConfig
let generated_message = IPCMessageWithId {
id: message_id.clone(),
author: "hub:server".to_string(),
Expand Down
Loading

0 comments on commit 922f2d5

Please sign in to comment.