Skip to content

Commit

Permalink
Check instances in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
cofob committed Aug 4, 2024
1 parent 74f48e1 commit b9fda6e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
50 changes: 33 additions & 17 deletions fastside-actualizer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use fastside_shared::{
serde_types::{Service, StoredData},
};
use serde_types::ActualizerData;
use tokio::task::JoinSet;
use utils::{log_err::LogErrResult, normalize::normalize_instances, tags::update_instance_tags};

mod serde_types;
Expand Down Expand Up @@ -88,28 +89,42 @@ async fn check_instances(
service: &mut Service,
config: &CrawlerConfig,
) -> Result<()> {
let checker = services::get_instance_checker(name);
let service_history = actualizer_data
.services
.entry(name.to_string())
.or_default();
let service_clone = service.clone();
for instance in service.instances.iter_mut() {
info!("Checking instance: {}", instance.url);
let client = build_client(&service_clone, config, proxies, instance)?;
let is_alive = {
let res = checker
.check(client.clone(), &service_clone, instance)
.await;
match res {
Ok(is_alive) => is_alive,
Err(e) => {
error!("Failed to check instance {url}: {e}", url = instance.url);
false
let mut tasks: JoinSet<Result<(usize, bool, Vec<String>)>> = JoinSet::new();
// Yes, this is not the most efficient way to do this, but it's the easiest way to shitcode it
// In this place it's not a big deal
// If you want to optimize it, feel free to do so
for (i, instance) in service.instances.iter().cloned().enumerate() {
let client = build_client(&service_clone, config, proxies, &instance)?;
let checker = services::get_instance_checker(name);
let service_clone = service.clone();
tasks.spawn(async move {
info!("Checking instance: {}", instance.url);
let is_alive = {
let res = checker
.check(client.clone(), &service_clone, &instance)
.await;
match res {
Ok(is_alive) => is_alive,
Err(e) => {
error!("Failed to check instance {url}: {e}", url = instance.url);
false
}
}
}
};
debug!("Instance is alive: {}", is_alive);
};
debug!("Instance is alive: {}", is_alive);
let tags = update_instance_tags(client, instance.url.clone(), &instance.tags).await;
Ok((i, is_alive, tags))
});
}

while let Some(res) = tasks.join_next().await {
let (i, is_alive, tags) = res??;
let instance = service.instances.get_mut(i).unwrap();

let instance_history = match service_history.get_instance_mut(&instance.url) {
Some(instance_history) => instance_history,
Expand All @@ -121,8 +136,9 @@ async fn check_instances(
instance_history.ping_history.cleanup();
instance_history.ping_history.push_ping(is_alive);

instance.tags = update_instance_tags(client, instance.url.clone(), &instance.tags).await;
instance.tags = tags;
}

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions fastside-actualizer/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use crate::types::ServiceUpdater;
pub use default::DefaultInstanceChecker;

/// Get a service updater by name.
pub fn get_service_updater(name: &str) -> Option<Box<dyn ServiceUpdater>> {
pub fn get_service_updater(name: &str) -> Option<Box<dyn ServiceUpdater + Send>> {
match name {
"searx" => Some(Box::new(searx::SearxUpdater::new())),
_ => None,
}
}

/// Get an instance checker by name.
pub fn get_instance_checker(name: &str) -> Box<dyn crate::types::InstanceChecker> {
pub fn get_instance_checker(name: &str) -> Box<dyn crate::types::InstanceChecker + Send> {
match name {
"searx" => Box::new(searx::SearxUpdater::new()),
_ => Box::new(DefaultInstanceChecker::new()),
Expand Down

0 comments on commit b9fda6e

Please sign in to comment.