From b9fda6e4fbd314b47d15c49caad3a5b5bcbf1a5a Mon Sep 17 00:00:00 2001 From: cofob Date: Mon, 5 Aug 2024 03:05:51 +0400 Subject: [PATCH] Check instances in parallel --- fastside-actualizer/src/main.rs | 50 ++++++++++++++++--------- fastside-actualizer/src/services/mod.rs | 4 +- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/fastside-actualizer/src/main.rs b/fastside-actualizer/src/main.rs index 3a950ba2..3589fe3a 100644 --- a/fastside-actualizer/src/main.rs +++ b/fastside-actualizer/src/main.rs @@ -12,6 +12,7 @@ use fastside_shared::{ serde_types::{Service, StoredData}, }; use serde_types::ActualizerData; +use tokio::task::JoinSet; use utils::{log_err::LogErrResult, normalize::normalize_instances, tags::update_instance_tags}; mod serde_types; @@ -88,28 +89,42 @@ async fn check_instances( service: &mut Service, config: &CrawlerConfig, ) -> Result<()> { - let checker = services::get_instance_checker(name); let service_history = actualizer_data .services .entry(name.to_string()) .or_default(); let service_clone = service.clone(); - for instance in service.instances.iter_mut() { - info!("Checking instance: {}", instance.url); - let client = build_client(&service_clone, config, proxies, instance)?; - let is_alive = { - let res = checker - .check(client.clone(), &service_clone, instance) - .await; - match res { - Ok(is_alive) => is_alive, - Err(e) => { - error!("Failed to check instance {url}: {e}", url = instance.url); - false + let mut tasks: JoinSet)>> = JoinSet::new(); + // Yes, this is not the most efficient way to do this, but it's the easiest way to shitcode it + // In this place it's not a big deal + // If you want to optimize it, feel free to do so + for (i, instance) in service.instances.iter().cloned().enumerate() { + let client = build_client(&service_clone, config, proxies, &instance)?; + let checker = services::get_instance_checker(name); + let service_clone = service.clone(); + tasks.spawn(async move { + info!("Checking instance: {}", instance.url); + let is_alive = { + let res = checker + .check(client.clone(), &service_clone, &instance) + .await; + match res { + Ok(is_alive) => is_alive, + Err(e) => { + error!("Failed to check instance {url}: {e}", url = instance.url); + false + } } - } - }; - debug!("Instance is alive: {}", is_alive); + }; + debug!("Instance is alive: {}", is_alive); + let tags = update_instance_tags(client, instance.url.clone(), &instance.tags).await; + Ok((i, is_alive, tags)) + }); + } + + while let Some(res) = tasks.join_next().await { + let (i, is_alive, tags) = res??; + let instance = service.instances.get_mut(i).unwrap(); let instance_history = match service_history.get_instance_mut(&instance.url) { Some(instance_history) => instance_history, @@ -121,8 +136,9 @@ async fn check_instances( instance_history.ping_history.cleanup(); instance_history.ping_history.push_ping(is_alive); - instance.tags = update_instance_tags(client, instance.url.clone(), &instance.tags).await; + instance.tags = tags; } + Ok(()) } diff --git a/fastside-actualizer/src/services/mod.rs b/fastside-actualizer/src/services/mod.rs index 3104b710..675f024f 100644 --- a/fastside-actualizer/src/services/mod.rs +++ b/fastside-actualizer/src/services/mod.rs @@ -6,7 +6,7 @@ use crate::types::ServiceUpdater; pub use default::DefaultInstanceChecker; /// Get a service updater by name. -pub fn get_service_updater(name: &str) -> Option> { +pub fn get_service_updater(name: &str) -> Option> { match name { "searx" => Some(Box::new(searx::SearxUpdater::new())), _ => None, @@ -14,7 +14,7 @@ pub fn get_service_updater(name: &str) -> Option> { } /// Get an instance checker by name. -pub fn get_instance_checker(name: &str) -> Box { +pub fn get_instance_checker(name: &str) -> Box { match name { "searx" => Box::new(searx::SearxUpdater::new()), _ => Box::new(DefaultInstanceChecker::new()),