-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.rs
127 lines (107 loc) · 3.69 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//!
//! Combines all individual eiffelvis_* libraries into the final product.
//!
//! Responsibilities are mainly owning the async executor and setting initial parameters through cli
//!
use std::{sync::Arc, time::Duration};
use eiffelvis_core::{domain::app::EiffelVisApp, graph_storage::ChunkedGraph};
use eiffelvis_http::AppData;
use structopt::StructOpt;
use tracing::{info, warn};
/// Command line options
#[derive(StructOpt, Debug)]
#[structopt(name = "EiffelVis")]
struct Cli {
/// HTTP host address
#[structopt(short, long, default_value = "127.0.0.1")]
address: String,
/// HTTP host port
#[structopt(short, long, default_value = "3001")]
port: u16,
/// AMQP URI
#[structopt(short = "r", long, default_value = "amqp://localhost:5672/%2f")]
rmq_uri: String,
/// AMQP queue
#[structopt(short = "q", long, default_value = "hello")]
rmq_queue: String,
/// AMQP reconnect timeout
#[structopt(short = "t", long, default_value = "3001")]
timeout: u64,
/// Maximum amount of chunks stored in memory
#[structopt(long, default_value = "8")]
max_chunks: usize,
/// Maximum amount of events a single chunk will hold
#[structopt(long, default_value = "128")]
chunk_size: u32,
/// Path to TLS certificate pem file
#[structopt(long)]
tls_cert: Option<String>,
/// Path to TLS private key pem file
#[structopt(long)]
tls_key: Option<String>,
}
/// Starts all the services that make up EiffelVis.
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
tracing_subscriber::fmt::init();
let cli = Cli::from_args();
let graph = Arc::new(AppData {
heuristic: 0.into(),
graph: tokio::sync::RwLock::new(ChunkedGraph::new(cli.max_chunks, cli.chunk_size)),
});
let http_server_handle = eiffelvis_http::Handle::new();
let http_server = tokio::spawn(eiffelvis_http::app(
graph.clone(),
cli.address.parse().unwrap(),
cli.port,
http_server_handle.clone(),
cli.tls_cert.zip(cli.tls_key),
));
let mut event_parser = eiffelvis_stream::ampq::AmpqStream::new(
cli.rmq_uri.into(),
cli.rmq_queue.into(),
"eiffelvis".into(),
)
.await
.expect("Failed to connect to ampq server");
let timeout = cli.timeout;
let event_parser = tokio::spawn(async move {
loop {
if let Some(bytes) = event_parser.next().await {
if let Ok(des) = serde_json::from_slice(&bytes) {
let mut lk = graph.graph.write().await;
if EiffelVisApp::push(&mut *lk, des) {
graph
.heuristic
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
info!("size: {}", lk.node_count());
} else {
warn!("Failed to push graph! maybe a duplicate event?")
}
} else {
warn!("Received new message but failed to deserialize");
}
} else {
warn!("Event stream failed, sleeping for 5 seconds to retry");
tokio::time::sleep(Duration::from_secs(timeout)).await;
}
}
});
tokio::spawn(async move {
shutdown_signal().await;
http_server_handle.graceful_shutdown(None);
});
tokio::select! {
res = event_parser => res.unwrap(),
res = http_server => res.unwrap().unwrap(),
};
}
#[doc(hidden)]
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
}