Skip to content

Commit

Permalink
Convert message queues (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcdeimos authored Aug 29, 2022
1 parent 2a8e6b0 commit b335b88
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 170 deletions.
44 changes: 22 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions rust/bin/acars_router/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "acars_router"
version = "1.0.10"
version = "1.0.11"
edition = "2021"
authors = ["Fred Clausen", "Mike Nye", "Alex Austin"]
description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers."
Expand All @@ -14,8 +14,8 @@ license = "MIT"
log = "0.4.17"
tokio = { version = "1.20.1", features = ["full", "tracing"] }
chrono = "0.4.22"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
serde = { version = "1.0.144", features = ["derive"] }
serde_json = "1.0.85"
failure = "0.1.8"
acars_config = { path = "../../libraries/acars_config" }
acars_logging = { path = "../../libraries/acars_logging" }
Expand Down
4 changes: 2 additions & 2 deletions rust/libraries/acars_connection_manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "acars_connection_manager"
version = "1.0.10"
version = "0.1.2"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -15,5 +15,5 @@ futures = "0.3.23"
async-trait = "0.1.57"
zmq = "0.9.2"
tmq = "0.3.2"
acars_vdlm2_parser = { git = "https://github.com/jcdeimos/acars_vdlm2_parser", version = "0.1.5" }
acars_vdlm2_parser = { git = "https://github.com/jcdeimos/acars_vdlm2_parser", version = "0.1.6" }
acars_config = { path = "../acars_config" }
3 changes: 2 additions & 1 deletion rust/libraries/acars_connection_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::fmt;
use std::fmt::Formatter;
use std::net::SocketAddr;
use std::time::Duration;
use acars_vdlm2_parser::AcarsVdlm2Message;
use stubborn_io::ReconnectOptions;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
Expand All @@ -41,7 +42,7 @@ pub(crate) struct SenderServer<T> {
pub(crate) host: String,
pub(crate) proto_name: String,
pub(crate) socket: T,
pub(crate) channel: Receiver<String>,
pub(crate) channel: Receiver<AcarsVdlm2Message>,
}

#[derive(Debug, Default)]
Expand Down
68 changes: 36 additions & 32 deletions rust/libraries/acars_connection_manager/src/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
// Full license information available in the project LICENSE file.
//

use std::collections::VecDeque;
use std::env;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
use std::collections::hash_map::DefaultHasher;
use std::collections::vec_deque::VecDeque;
use std::hash::{Hash, Hasher};
use acars_vdlm2_parser::{AcarsVdlm2Message, DecodeMessage, MessageResult};
use acars_config::Input;
Expand Down Expand Up @@ -56,22 +56,22 @@ impl MessageHandlerConfig {
}
}

pub(crate) async fn watch_message_queue(self, mut input_queue: Receiver<String>, output_queue: Sender<String>) {
pub(crate) async fn watch_message_queue(self, mut input_queue: Receiver<String>, output_queue: Sender<AcarsVdlm2Message>) {
let dedupe_queue: Arc<Mutex<VecDeque<(u64, u64)>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(100)));
let total_messages_processed = Arc::new(Mutex::new(0));
let total_messages_since_last = Arc::new(Mutex::new(0));
let queue_type_stats = self.queue_type.clone();
let queue_type_dedupe = self.queue_type.clone();
let stats_every = self.stats_every * 60; // Value has to be in seconds. Input is in minutes.
let version = env!("CARGO_PKG_VERSION");
let total_messages_processed: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let total_messages_since_last: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let queue_type_stats: String = self.queue_type.clone();
let queue_type_dedupe: String = self.queue_type.clone();
let stats_every: u64 = self.stats_every * 60; // Value has to be in seconds. Input is in minutes.
let version: &str = env!("CARGO_PKG_VERSION");

// Generate an async loop that sleeps for the requested stats print duration and then logs
// Give it the context for the counters
// The stats values to the console.

let stats_total_messages_context = Arc::clone(&total_messages_processed);
let stats_total_messages_since_last_context = Arc::clone(&total_messages_since_last);
let stats_total_messages_context: Arc<Mutex<i32>> = Arc::clone(&total_messages_processed);
let stats_total_messages_since_last_context: Arc<Mutex<i32>> = Arc::clone(&total_messages_since_last);

tokio::spawn(async move {
print_stats(
Expand All @@ -86,8 +86,8 @@ impl MessageHandlerConfig {
// Give it the context for the dedupe queue
// The dedupe queue to be cleaned.
if self.dedupe {
let dedupe_queue_context = Arc::clone(&dedupe_queue);
let dedupe_window = self.dedupe_window;
let dedupe_queue_context: Arc<Mutex<VecDeque<(u64,u64)>>> = Arc::clone(&dedupe_queue);
let dedupe_window: u64 = self.dedupe_window;

tokio::spawn(async move {
clean_up_dedupe_queue(
Expand All @@ -101,9 +101,9 @@ impl MessageHandlerConfig {
while let Some(message_content) = input_queue.recv().await {
// Grab the mutexes for the stats counter and increment the total messages processed by the message handler.
let parse_message: MessageResult<AcarsVdlm2Message> = message_content.decode_message();
let stats_total_loop_context = Arc::clone(&total_messages_processed);
let stats_total_loop_since_last_context = Arc::clone(&total_messages_since_last);
let dedupe_queue_loop = Arc::clone(&dedupe_queue);
let stats_total_loop_context: Arc<Mutex<i32>> = Arc::clone(&total_messages_processed);
let stats_total_loop_since_last_context: Arc<Mutex<i32>> = Arc::clone(&total_messages_since_last);
let dedupe_queue_loop: Arc<Mutex<VecDeque<(u64,u64)>>> = Arc::clone(&dedupe_queue);
*stats_total_loop_since_last_context.lock().await += 1;
*stats_total_loop_context.lock().await += 1;

Expand All @@ -113,12 +113,12 @@ impl MessageHandlerConfig {
Err(parse_error) => error!("[Message Handler {}] Failed to parse received message: {}\nReceived: {}", self.queue_type, parse_error, message_content),
Ok(mut message) => {

let current_time = match SystemTime::now().duration_since(UNIX_EPOCH) {
let current_time: f64 = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_secs_f64(),
Err(_) => f64::default(),
};

let get_message_time = message.get_time();
let get_message_time: Option<f64> = message.get_time();

match get_message_time {
None => {
Expand All @@ -141,16 +141,16 @@ impl MessageHandlerConfig {
}

// Time to hash the message
let hash_message = hash_message(message.clone());
let hash_message: MessageResult<u64> = hash_message(message.clone());

match hash_message {
Err(hash_parsing_error) => error!("[Message Handler {}] Failed to create hash of message: {}",
self.queue_type, hash_parsing_error),
Ok(hashed_value) => {
if self.dedupe {
let mut rejected = false;
let mut rejected: bool = false;
for (time, hashed_value_saved) in dedupe_queue_loop.lock().await.iter() {
let f64_time = *time as f64;
let f64_time: f64 = *time as f64;
if *hashed_value_saved == hashed_value && current_time - f64_time < self.dedupe_window as f64
// Both the time and hash have to be equal to reject the message
{
Expand Down Expand Up @@ -181,18 +181,22 @@ impl MessageHandlerConfig {
trace!("[Message Handler {}] Hashed value: {}", self.queue_type, hashed_value);
trace!("[Message Handler {}] Final message: {:?}", self.queue_type, message);

let parse_final_message: MessageResult<String> = message.to_string();
match parse_final_message {
Err(parse_error) => error!("{}", parse_error),
Ok(final_message) => {
// Send to the output methods for emitting on the network
debug!("[Message Handler {}] Message to be sent: {}", self.queue_type, &final_message);
match output_queue.send(final_message).await {
Ok(_) => debug!("[Message Handler {}] Message sent to output queue", self.queue_type),
Err(e) => error!("[Message Handler {}] Error sending message to output queue: {}", self.queue_type, e)
};
}
match output_queue.send(message).await {
Ok(_) => debug!("[Message Handler {}] Message sent to output queue", self.queue_type),
Err(e) => error!("[Message Handler {}] Error sending message to output queue: {}", self.queue_type, e)
}
// let parse_final_message: MessageResult<String> = message.to_string();
// match parse_final_message {
// Err(parse_error) => error!("{}", parse_error),
// Ok(final_message) => {
// // Send to the output methods for emitting on the network
// debug!("[Message Handler {}] Message to be sent: {}", self.queue_type, &final_message);
// match output_queue.send(final_message).await {
// Ok(_) => debug!("[Message Handler {}] Message sent to output queue", self.queue_type),
// Err(e) => error!("[Message Handler {}] Error sending message to output queue: {}", self.queue_type, e)
// };
// }
// }
}
}
}
Expand Down Expand Up @@ -253,7 +257,7 @@ fn hash_message(mut message: AcarsVdlm2Message) -> MessageResult<u64> {
message.clear_noise_level(); // Clears out "vdl2.noise_level"
message.clear_octets_corrected_by_fec(); // Clears out "vdl2.octets_corrected_by_fec"
message.clear_sig_level(); // Clears out "vdl2.sig_level"
let parse_msg = message.to_string();
let parse_msg: MessageResult<String> = message.to_string();
match parse_msg {
Err(parse_error) => Err(parse_error),
Ok(msg) => {
Expand Down
10 changes: 5 additions & 5 deletions rust/libraries/acars_connection_manager/src/packet_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl ProcessAssembly for Option<AcarsVdlm2Message> {
async fn process_reassembly(&self, proto_name: &str, channel: &Sender<String>, listener_type: &str) {
match self {
Some(reassembled_msg) => {
let parsed_msg: MessageResult<String> = reassembled_msg.to_string();
let parsed_msg: MessageResult<String> = reassembled_msg.to_string_newline();
match parsed_msg {
Err(parse_error) => error!("{}", parse_error),
Ok(msg) => {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl PacketHandler {
}

let mut output_message: Option<AcarsVdlm2Message> = None;
let mut message_for_peer = String::new();
let mut message_for_peer: String = String::new();
let mut old_time: Option<f64> = None; // Save the time of the first message for this peer

// TODO: Handle message reassembly for out of sequence messages
Expand Down Expand Up @@ -110,7 +110,7 @@ impl PacketHandler {
// than the reassembly window. Therefore we use the old_time we grabbed from the queue above, or if it's the first
// message we get the current time.

let message_queue_time = match old_time {
let message_queue_time: f64 = match old_time {
Some(t) => t,
None => match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_secs_f64(),
Expand All @@ -126,7 +126,7 @@ impl PacketHandler {
}

pub async fn clean_queue(&self) {
let current_time = match SystemTime::now().duration_since(UNIX_EPOCH) {
let current_time: f64 = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_secs_f64(),
Err(_) => 0.0,
};
Expand All @@ -138,7 +138,7 @@ impl PacketHandler {

self.queue.lock().await.retain(|peer, old_messages| {
let (time, _) = old_messages;
let time_diff = current_time - *time;
let time_diff: f64 = current_time - *time;
if time_diff > self.reassembly_window {
debug!("[UDP SERVER {}] Peer {peer} has been idle for {time_diff} seconds, removing from queue", self.name);
false
Expand Down
Loading

0 comments on commit b335b88

Please sign in to comment.