From 5373a2673a2bb5ff85800d012195c985f751680a Mon Sep 17 00:00:00 2001 From: cofob Date: Fri, 30 Aug 2024 16:12:21 +0400 Subject: [PATCH 1/3] Config auto-reload --- fastside/src/crawler.rs | 116 +++++++++++++++++++++++++++----- fastside/src/main.rs | 102 ++++++++++++++++++++-------- fastside/src/routes/api.rs | 6 +- fastside/src/routes/config.rs | 6 +- fastside/src/routes/index.rs | 10 ++- fastside/src/routes/redirect.rs | 15 +++-- fastside/src/search.rs | 10 +-- fastside/templates/index.html | 4 +- 8 files changed, 205 insertions(+), 64 deletions(-) diff --git a/fastside/src/crawler.rs b/fastside/src/crawler.rs index ab30c2ec..f06c5194 100644 --- a/fastside/src/crawler.rs +++ b/fastside/src/crawler.rs @@ -7,7 +7,10 @@ use std::{ use chrono::{DateTime, Utc}; use reqwest::StatusCode; use thiserror::Error; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{ + sync::{Mutex, RwLock}, + time::sleep, +}; use url::Url; use crate::{config::CrawlerConfig, types::LoadedData}; @@ -84,37 +87,95 @@ pub struct CrawledServices { pub time: DateTime, } +#[derive(Debug)] +pub enum CrawledData { + CrawledServices(CrawledServices), + InitialLoading, + ReloadingServices { + current: CrawledServices, + new: Option, + }, +} + +impl CrawledData { + pub fn get_services(&self) -> Option<&CrawledServices> { + match self { + Self::CrawledServices(s) => Some(s), + Self::InitialLoading => None, + Self::ReloadingServices { current, .. } => Some(current), + } + } + + pub fn is_fetched(&self) -> bool { + self.get_services().is_some() + } + + pub fn is_reloading(&self) -> bool { + matches!(self, Self::ReloadingServices { .. }) + } + + pub fn replace(&mut self, new: CrawledData) { + *self = new; + } + + pub fn make_reloading(&mut self) { + let current = match self { + Self::CrawledServices(s) => s.clone(), + _ => return, + }; + *self = Self::ReloadingServices { current, new: None }; + } + + pub fn make_reloaded(&mut self) { + let new = match self { + Self::ReloadingServices { new, .. } => new.take(), + _ => return, + }; + if let Some(new) = new { + *self = Self::CrawledServices(new); + } + } +} + +impl AsRef for CrawledData { + fn as_ref(&self) -> &CrawledData { + self + } +} + #[derive(Debug)] pub struct Crawler { - loaded_data: Arc, + loaded_data: Arc>, config: Arc, - data: RwLock>, + data: RwLock, + crawler_lock: Mutex<()>, } impl Crawler { - pub fn new(loaded_data: Arc, config: CrawlerConfig) -> Self { + pub fn new(loaded_data: Arc>, config: CrawlerConfig) -> Self { Self { loaded_data, config: Arc::new(config), - data: RwLock::new(None), + data: RwLock::new(CrawledData::InitialLoading), + crawler_lock: Mutex::new(()), } } #[inline] - pub async fn read(&self) -> tokio::sync::RwLockReadGuard> { + pub async fn read(&self) -> tokio::sync::RwLockReadGuard { self.data.read().await } async fn crawl_single_instance( config: Arc, - loaded_data: Arc, + loaded_data: Arc>, service: Arc, instance: Instance, ) -> Result<(CrawledInstance, String), CrawlerError> { let client = build_client( service.as_ref(), config.as_ref(), - &loaded_data.proxies, + &loaded_data.read().await.proxies, &instance, )?; @@ -165,8 +226,15 @@ impl Crawler { } async fn crawl(&self) -> Result<(), CrawlerError> { + let Ok(crawler_guard) = self.crawler_lock.try_lock() else { + warn!("Crawler lock is already acquired, skipping crawl"); + return Ok(()); + }; + let mut crawled_services: HashMap = self .loaded_data + .read() + .await .services .keys() .map(|name| { @@ -181,7 +249,7 @@ impl Crawler { .collect(); let mut parallelise = Parallelise::with_capacity(self.config.max_concurrent_requests); - for service in self.loaded_data.services.values() { + for service in self.loaded_data.read().await.services.values() { let service = Arc::new(service.clone()); for instance in &service.instances { let loaded_data = self.loaded_data.clone(); @@ -190,7 +258,7 @@ impl Crawler { parallelise .push(tokio::spawn(Self::crawl_single_instance( config, - loaded_data, + loaded_data.clone(), service.clone(), instance, ))) @@ -216,17 +284,35 @@ impl Crawler { } let mut data = self.data.write().await; - if data.is_none() { - info!("Finished initial crawl, we are ready to serve requests"); - } - data.replace(CrawledServices { + data.replace(CrawledData::CrawledServices(CrawledServices { services: crawled_services, time: Utc::now(), - }); + })); + match data.as_ref() { + CrawledData::ReloadingServices { .. } => { + info!("Finished reloading services"); + } + CrawledData::InitialLoading => { + info!("Finished initial crawl, we are ready to serve requests"); + } + CrawledData::CrawledServices(_) => { + info!("Finished crawl"); + } + } + + drop(crawler_guard); Ok(()) } + /// Run crawler instantly in update loaded_data mode. + pub async fn update_crawl(&self) -> Result<(), CrawlerError> { + let mut data = self.data.write().await; + data.make_reloading(); + drop(data); + self.crawl().await + } + pub async fn crawler_loop(&self) { loop { debug!("Starting crawl"); diff --git a/fastside/src/main.rs b/fastside/src/main.rs index d0220a27..cd456d6b 100644 --- a/fastside/src/main.rs +++ b/fastside/src/main.rs @@ -13,7 +13,7 @@ use clap::{Parser, Subcommand}; use config::load_config; use crawler::Crawler; use fastside_shared::{ - config, + config::{self, AppConfig}, errors::CliError, log_setup, serde_types::{ServicesData, StoredData}, @@ -28,6 +28,7 @@ use std::{ str::FromStr, sync::Arc, }; +use tokio::sync::RwLock; use types::{CompiledRegexSearch, LoadedData}; #[deny(unused_imports)] @@ -73,6 +74,57 @@ async fn crawler_loop(crawler: Arc) { crawler.crawler_loop().await } +// This function loads services file +fn load_services(data_path: &PathBuf, config: &AppConfig) -> Result { + if !data_path.is_file() { + return Err(anyhow::anyhow!( + "services file does not exist or is not a file" + )); + } + let data_content = + std::fs::read_to_string(data_path).context("failed to read services file")?; + let stored_data: StoredData = + serde_json::from_str(&data_content).context("failed to parse services file")?; + let services_data: ServicesData = stored_data + .services + .into_iter() + .map(|service| (service.name.clone(), service)) + .collect(); + Ok(LoadedData { + services: services_data, + proxies: config.proxies.clone(), + default_user_config: config.default_user_config.clone(), + }) +} + +// This functions check every 5 seconds if services file has changed and reloads it if it has. +async fn reload_services( + data_path: PathBuf, + config: Arc, + crawler: Arc, + data: Arc>, +) { + let mut file_stat = std::fs::metadata(&data_path).expect("failed to get file metadata"); + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let new_file_stat = std::fs::metadata(&data_path).expect("failed to get file metadata"); + if new_file_stat + .modified() + .expect("failed to get modified time") + != file_stat.modified().expect("failed to get modified time") + { + info!("Reloading services file"); + let new_data = load_services(&data_path, &config).expect("failed to load services"); + *data.write().await = new_data; + file_stat = new_file_stat; + crawler + .update_crawl() + .await + .expect("failed to update crawl"); + } + } +} + #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); @@ -85,40 +137,24 @@ async fn main() -> Result<()> { listen, workers, }) => { - let config = load_config(&cli.config).context("failed to load config")?; + let config = Arc::new(load_config(&cli.config).context("failed to load config")?); + + let data_path = services + .clone() + .or(config.services_path.clone()) + .unwrap_or_else(|| PathBuf::from_str("services.json").unwrap()); let listen: SocketAddr = listen .unwrap_or_else(|| SocketAddr::V4(SocketAddrV4::new([127, 0, 0, 1].into(), 8080))); let workers: usize = workers.unwrap_or_else(num_cpus::get); - let data: Arc = { - let data_path = services - .clone() - .or(config.services_path.clone()) - .unwrap_or_else(|| PathBuf::from_str("services.json").unwrap()); - if !data_path.is_file() { - return Err(anyhow::anyhow!( - "services file does not exist or is not a file" - )); - } - let data_content = - std::fs::read_to_string(data_path).context("failed to read services file")?; - let stored_data: StoredData = - serde_json::from_str(&data_content).context("failed to parse services file")?; - let services_data: ServicesData = stored_data - .services - .into_iter() - .map(|service| (service.name.clone(), service)) - .collect(); - let data = LoadedData { - services: services_data, - proxies: config.proxies.clone(), - default_user_config: config.default_user_config.clone(), - }; - - Arc::new(data) + let data: Arc> = { + let data = load_services(&data_path, &config)?; + Arc::new(RwLock::new(data)) }; let regexes: HashMap> = data + .read() + .await .services .iter() .filter_map(|(name, service)| { @@ -144,9 +180,16 @@ async fn main() -> Result<()> { let cloned_crawler = crawler.clone(); let crawler_loop_handle = tokio::spawn(crawler_loop(cloned_crawler)); + let reload_services_handle = tokio::spawn(reload_services( + data_path.clone(), + config.clone(), + crawler.clone(), + data.clone(), + )); + info!("Listening on {}", listen); - let config_web_data = web::Data::new(config.clone()); + let config_web_data = web::Data::from(config.clone()); let crawler_web_data = web::Data::from(crawler.clone()); let data_web_data = web::Data::from(data.clone()); let regexes_web_data = web::Data::new(regexes); @@ -167,6 +210,7 @@ async fn main() -> Result<()> { .await .context("failed to start api server")?; + reload_services_handle.abort(); crawler_loop_handle.abort(); } None => { diff --git a/fastside/src/routes/api.rs b/fastside/src/routes/api.rs index aedf5df8..c3b04ff5 100644 --- a/fastside/src/routes/api.rs +++ b/fastside/src/routes/api.rs @@ -1,6 +1,7 @@ use actix_web::{post, web, Responder, Scope}; use fastside_shared::config::UserConfig; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use crate::{ config::AppConfig, @@ -33,13 +34,14 @@ struct RedirectResponse { #[post("/redirect")] async fn redirect( crawler: web::Data, - loaded_data: web::Data, + loaded_data: web::Data>, regexes: web::Data, redirect_request: web::Json, ) -> actix_web::Result { + let loaded_data_guard = loaded_data.read().await; let (url, is_fallback) = super::redirect::find_redirect( crawler.as_ref(), - loaded_data.as_ref(), + &loaded_data_guard, regexes.as_ref(), &redirect_request.config, &redirect_request.url, diff --git a/fastside/src/routes/config.rs b/fastside/src/routes/config.rs index 9d45ee67..443fb4ff 100644 --- a/fastside/src/routes/config.rs +++ b/fastside/src/routes/config.rs @@ -1,6 +1,7 @@ use actix_web::{cookie::Cookie, get, http::header::LOCATION, web, HttpRequest, Responder, Scope}; use askama::Template; use fastside_shared::config::UserConfig; +use tokio::sync::RwLock; use crate::{ config::AppConfig, errors::RedirectError, types::LoadedData, @@ -22,9 +23,10 @@ pub struct ConfigureTemplate<'a> { #[get("")] async fn configure_page( req: HttpRequest, - loaded_data: web::Data, + loaded_data: web::Data>, ) -> actix_web::Result { - let user_config = load_settings_cookie(&req, &loaded_data.default_user_config); + let loaded_data_guard = loaded_data.read().await; + let user_config = load_settings_cookie(&req, &loaded_data_guard.default_user_config); let template = ConfigureTemplate { current_config: &user_config diff --git a/fastside/src/routes/index.rs b/fastside/src/routes/index.rs index eb360be9..a4883707 100644 --- a/fastside/src/routes/index.rs +++ b/fastside/src/routes/index.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use actix_web::{get, web, Responder, Scope}; use askama::Template; use chrono::{DateTime, Utc}; +use tokio::sync::RwLock; use crate::{ config::AppConfig, @@ -32,21 +33,24 @@ pub struct IndexTemplate<'a> { pub crawled_services: &'a HashMap, pub services: &'a ServicesData, pub time: &'a DateTime, + pub is_reloading: bool, } #[get("/")] async fn index( crawler: web::Data, - loaded_data: web::Data, + loaded_data: web::Data>, ) -> actix_web::Result { let data = crawler.read().await; - let Some(crawled_services) = data.as_ref() else { + let Some(crawled_services) = data.get_services() else { return Err(RedirectError::from(SearchError::CrawlerNotFetchedYet))?; }; + let loaded_data_guard = loaded_data.read().await; let template = IndexTemplate { - services: &loaded_data.services, + services: &loaded_data_guard.services, crawled_services: &crawled_services.services, time: &crawled_services.time, + is_reloading: data.is_reloading(), }; Ok(actix_web::HttpResponse::Ok() diff --git a/fastside/src/routes/redirect.rs b/fastside/src/routes/redirect.rs index c0547f7d..7979f904 100644 --- a/fastside/src/routes/redirect.rs +++ b/fastside/src/routes/redirect.rs @@ -4,6 +4,7 @@ use actix_web::{ web, HttpRequest, Responder, Scope, }; use askama::Template; +use tokio::sync::RwLock; use crate::{ config::AppConfig, @@ -42,15 +43,16 @@ async fn cached_redirect( path: web::Path<(String, String)>, config: web::Data, crawler: web::Data, - loaded_data: web::Data, + loaded_data: web::Data>, ) -> actix_web::Result { let (service_name, _) = path.into_inner(); - let user_config = load_settings_cookie(&req, &loaded_data.default_user_config); + let loaded_data_guard = loaded_data.read().await; + let user_config = load_settings_cookie(&req, &loaded_data_guard.default_user_config); let guard = crawler.read().await; let (crawled_service, _) = - find_redirect_service_by_name(&guard, &loaded_data.services, &service_name) + find_redirect_service_by_name(&guard, &loaded_data_guard.services, &service_name) .await .map_err(RedirectError::from)?; let mut instances = get_redirect_instances( @@ -166,16 +168,17 @@ async fn base_redirect( req: HttpRequest, path: web::Path, crawler: web::Data, - loaded_data: web::Data, + loaded_data: web::Data>, regexes: web::Data, ) -> actix_web::Result { let path = path.into_inner(); - let user_config = load_settings_cookie(&req, &loaded_data.default_user_config); + let loaded_data_guard = loaded_data.read().await; + let user_config = load_settings_cookie(&req, &loaded_data_guard.default_user_config); let (mut url, is_fallback) = find_redirect( crawler.get_ref(), - loaded_data.get_ref(), + &loaded_data_guard, regexes.get_ref(), &user_config, &path, diff --git a/fastside/src/search.rs b/fastside/src/search.rs index eaac9e8f..33485f01 100644 --- a/fastside/src/search.rs +++ b/fastside/src/search.rs @@ -4,7 +4,7 @@ use regex::Captures; use tokio::sync::RwLockReadGuard; use crate::{ - crawler::{CrawledInstance, CrawledInstanceStatus, CrawledService, CrawledServices}, + crawler::{CrawledData, CrawledInstance, CrawledInstanceStatus, CrawledService}, types::Regexes, }; use fastside_shared::{ @@ -27,11 +27,11 @@ pub enum SearchError { } pub async fn find_redirect_service_by_name<'a>( - guard: &'a RwLockReadGuard<'a, Option>, + guard: &'a RwLockReadGuard<'a, CrawledData>, services: &'a ServicesData, query: &str, ) -> Result<(&'a CrawledService, &'a Service), SearchError> { - let data = match guard.as_ref() { + let data = match guard.get_services() { Some(data) => data, None => return Err(SearchError::CrawlerNotFetchedYet), }; @@ -143,12 +143,12 @@ fn replace_args_in_url(url: &str, captures: Captures) -> Result( - guard: &'a RwLockReadGuard<'a, Option>, + guard: &'a RwLockReadGuard<'a, CrawledData>, services: &'a ServicesData, regexes: &'a Regexes, query: &str, ) -> Result<(&'a CrawledService, &'a Service, String), SearchError> { - let data = match guard.as_ref() { + let data = match guard.get_services() { Some(data) => data, None => return Err(SearchError::CrawlerNotFetchedYet), }; diff --git a/fastside/templates/index.html b/fastside/templates/index.html index cf9d6262..e702ea53 100644 --- a/fastside/templates/index.html +++ b/fastside/templates/index.html @@ -3,7 +3,7 @@ {% block content %}

Fastside [GitHub] [Configure]


-

Last synced {{ time }}

+

Last synced {{ time }}{% if is_reloading %} (config is reloading){% endif %}

-{% endblock %} \ No newline at end of file +{% endblock %} From a6131f274df99837ffa771fc51f9e6c0abcc77ad Mon Sep 17 00:00:00 2001 From: cofob Date: Fri, 30 Aug 2024 16:37:30 +0400 Subject: [PATCH 2/3] Auto update config from URL --- fastside-shared/src/config.rs | 20 ++++- fastside/src/main.rs | 152 ++++++++++++++++++++++++++-------- 2 files changed, 138 insertions(+), 34 deletions(-) diff --git a/fastside-shared/src/config.rs b/fastside-shared/src/config.rs index bd01c549..44a4eda2 100644 --- a/fastside-shared/src/config.rs +++ b/fastside-shared/src/config.rs @@ -99,17 +99,35 @@ impl UserConfig { } } +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct AutoUpdaterConfig { + #[serde(default)] + pub enabled: bool, + pub interval: Duration, +} + +impl Default for AutoUpdaterConfig { + fn default() -> Self { + Self { + enabled: false, + interval: Duration::from_secs(60), + } + } +} + /// Application configuration. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct AppConfig { #[serde(default)] pub crawler: CrawlerConfig, #[serde(default)] + pub auto_updater: AutoUpdaterConfig, + #[serde(default)] pub proxies: ProxyData, #[serde(default)] pub default_user_config: UserConfig, #[serde(default)] - pub services_path: Option, + pub services: Option, } /// Load application configuration. diff --git a/fastside/src/main.rs b/fastside/src/main.rs index cd456d6b..e2da5522 100644 --- a/fastside/src/main.rs +++ b/fastside/src/main.rs @@ -30,6 +30,7 @@ use std::{ }; use tokio::sync::RwLock; use types::{CompiledRegexSearch, LoadedData}; +use url::Url; #[deny(unused_imports)] #[deny(unused_variables)] @@ -57,9 +58,9 @@ struct Cli { enum Commands { /// Run API server. Serve { - /// Services file path. + /// Services path. #[arg(short, long)] - services: Option, + services: Option, /// Listen socket address. #[arg(short, long)] listen: Option, @@ -74,15 +75,48 @@ async fn crawler_loop(crawler: Arc) { crawler.crawler_loop().await } -// This function loads services file -fn load_services(data_path: &PathBuf, config: &AppConfig) -> Result { - if !data_path.is_file() { - return Err(anyhow::anyhow!( - "services file does not exist or is not a file" - )); +#[derive(Debug)] +enum ServicesSource { + Filesystem(PathBuf), + Remote(Url), +} + +impl FromStr for ServicesSource { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + if let Ok(url) = Url::parse(s) { + Ok(ServicesSource::Remote(url)) + } else { + Ok(ServicesSource::Filesystem(PathBuf::from(s))) + } } - let data_content = - std::fs::read_to_string(data_path).context("failed to read services file")?; +} + +// This function loads services from file or remote source +async fn load_services_data(source: &ServicesSource) -> Result { + debug!("Loading services from {:?}", source); + Ok(match source { + ServicesSource::Filesystem(path) => { + if !path.is_file() { + return Err(anyhow::anyhow!( + "services file does not exist or is not a file" + )); + } + std::fs::read_to_string(path).context("failed to read services file")? + } + ServicesSource::Remote(url) => reqwest::get(url.clone()) + .await + .context("failed to fetch services file")? + .text() + .await + .context("failed to read services file")?, + }) +} + +// This function loads services file +async fn load_services(source: &ServicesSource, config: &AppConfig) -> Result { + let data_content = load_services_data(source).await?; let stored_data: StoredData = serde_json::from_str(&data_content).context("failed to parse services file")?; let services_data: ServicesData = stored_data @@ -99,28 +133,77 @@ fn load_services(data_path: &PathBuf, config: &AppConfig) -> Result // This functions check every 5 seconds if services file has changed and reloads it if it has. async fn reload_services( - data_path: PathBuf, + source: ServicesSource, config: Arc, crawler: Arc, data: Arc>, ) { - let mut file_stat = std::fs::metadata(&data_path).expect("failed to get file metadata"); - loop { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let new_file_stat = std::fs::metadata(&data_path).expect("failed to get file metadata"); - if new_file_stat - .modified() - .expect("failed to get modified time") - != file_stat.modified().expect("failed to get modified time") - { - info!("Reloading services file"); - let new_data = load_services(&data_path, &config).expect("failed to load services"); - *data.write().await = new_data; - file_stat = new_file_stat; - crawler - .update_crawl() + if !config.auto_updater.enabled { + debug!("Auto updater is disabled"); + return; + } + let reload_interval = config.auto_updater.interval.as_secs(); + match &source { + ServicesSource::Filesystem(path) => { + let mut file_stat = std::fs::metadata(path).expect("failed to get file metadata"); + loop { + tokio::time::sleep(std::time::Duration::from_secs(reload_interval)).await; + let new_file_stat = std::fs::metadata(path).expect("failed to get file metadata"); + debug!("File modified: {:?}", new_file_stat.modified()); + if new_file_stat + .modified() + .expect("failed to get modified time") + != file_stat.modified().expect("failed to get modified time") + { + info!("Reloading services file"); + let new_data = load_services(&source, &config) + .await + .expect("failed to load services"); + *data.write().await = new_data; + file_stat = new_file_stat; + crawler + .update_crawl() + .await + .expect("failed to update crawl"); + } + } + } + ServicesSource::Remote(url) => { + let client = reqwest::Client::new(); + let mut etag = client + .head(url.clone()) + .send() .await - .expect("failed to update crawl"); + .expect("failed to send HEAD request") + .headers() + .get("etag") + .map(|header| header.to_str().expect("failed to parse etag").to_string()) + .expect("failed to get etag"); + loop { + tokio::time::sleep(std::time::Duration::from_secs(reload_interval)).await; + let new_etag = client + .head(url.clone()) + .send() + .await + .expect("failed to send HEAD request") + .headers() + .get("etag") + .map(|header| header.to_str().expect("failed to parse etag").to_string()) + .expect("failed to get etag"); + debug!("Etag: {}", etag); + if new_etag != etag { + info!("Reloading services file"); + let new_data = load_services(&source, &config) + .await + .expect("failed to load services"); + *data.write().await = new_data; + etag = new_etag; + crawler + .update_crawl() + .await + .expect("failed to update crawl"); + } + } } } } @@ -139,17 +222,20 @@ async fn main() -> Result<()> { }) => { let config = Arc::new(load_config(&cli.config).context("failed to load config")?); - let data_path = services - .clone() - .or(config.services_path.clone()) - .unwrap_or_else(|| PathBuf::from_str("services.json").unwrap()); + let services_source = ServicesSource::from_str( + &services + .clone() + .or(config.services.clone()) + .unwrap_or_else(|| String::from("services.json")), + )?; + debug!("Using services source: {:?}", services_source); let listen: SocketAddr = listen .unwrap_or_else(|| SocketAddr::V4(SocketAddrV4::new([127, 0, 0, 1].into(), 8080))); let workers: usize = workers.unwrap_or_else(num_cpus::get); let data: Arc> = { - let data = load_services(&data_path, &config)?; + let data = load_services(&services_source, &config).await?; Arc::new(RwLock::new(data)) }; let regexes: HashMap> = data @@ -181,7 +267,7 @@ async fn main() -> Result<()> { let crawler_loop_handle = tokio::spawn(crawler_loop(cloned_crawler)); let reload_services_handle = tokio::spawn(reload_services( - data_path.clone(), + services_source, config.clone(), crawler.clone(), data.clone(), From e4a5e046d5b3fd47180eeaaf8f62412de4932c37 Mon Sep 17 00:00:00 2001 From: cofob Date: Fri, 30 Aug 2024 16:46:19 +0400 Subject: [PATCH 3/3] Wrap reload services --- fastside-shared/src/config.rs | 3 +- fastside/src/main.rs | 78 +++++++++++++++++++++++++---------- 2 files changed, 57 insertions(+), 24 deletions(-) diff --git a/fastside-shared/src/config.rs b/fastside-shared/src/config.rs index 44a4eda2..67d3ac69 100644 --- a/fastside-shared/src/config.rs +++ b/fastside-shared/src/config.rs @@ -101,7 +101,6 @@ impl UserConfig { #[derive(Deserialize, Serialize, Debug, Clone)] pub struct AutoUpdaterConfig { - #[serde(default)] pub enabled: bool, pub interval: Duration, } @@ -109,7 +108,7 @@ pub struct AutoUpdaterConfig { impl Default for AutoUpdaterConfig { fn default() -> Self { Self { - enabled: false, + enabled: true, interval: Duration::from_secs(60), } } diff --git a/fastside/src/main.rs b/fastside/src/main.rs index e2da5522..39262857 100644 --- a/fastside/src/main.rs +++ b/fastside/src/main.rs @@ -133,38 +133,37 @@ async fn load_services(source: &ServicesSource, config: &AppConfig) -> Result, crawler: Arc, data: Arc>, -) { - if !config.auto_updater.enabled { - debug!("Auto updater is disabled"); - return; - } +) -> Result<()> { let reload_interval = config.auto_updater.interval.as_secs(); match &source { ServicesSource::Filesystem(path) => { - let mut file_stat = std::fs::metadata(path).expect("failed to get file metadata"); + let mut file_stat = std::fs::metadata(path).context("failed to get file metadata")?; loop { tokio::time::sleep(std::time::Duration::from_secs(reload_interval)).await; - let new_file_stat = std::fs::metadata(path).expect("failed to get file metadata"); + let new_file_stat = + std::fs::metadata(path).context("failed to get file metadata")?; debug!("File modified: {:?}", new_file_stat.modified()); if new_file_stat .modified() - .expect("failed to get modified time") - != file_stat.modified().expect("failed to get modified time") + .context("failed to get modified time")? + != file_stat + .modified() + .context("failed to get modified time")? { info!("Reloading services file"); - let new_data = load_services(&source, &config) + let new_data = load_services(source, &config) .await - .expect("failed to load services"); + .context("failed to load services")?; *data.write().await = new_data; file_stat = new_file_stat; crawler .update_crawl() .await - .expect("failed to update crawl"); + .context("failed to update crawl")?; } } } @@ -174,40 +173,64 @@ async fn reload_services( .head(url.clone()) .send() .await - .expect("failed to send HEAD request") + .context("failed to send HEAD request")? .headers() .get("etag") .map(|header| header.to_str().expect("failed to parse etag").to_string()) - .expect("failed to get etag"); + .context("failed to get etag")?; loop { tokio::time::sleep(std::time::Duration::from_secs(reload_interval)).await; let new_etag = client .head(url.clone()) .send() .await - .expect("failed to send HEAD request") + .context("failed to send HEAD request")? .headers() .get("etag") .map(|header| header.to_str().expect("failed to parse etag").to_string()) - .expect("failed to get etag"); + .context("failed to get etag")?; debug!("Etag: {}", etag); if new_etag != etag { info!("Reloading services file"); - let new_data = load_services(&source, &config) + let new_data = load_services(source, &config) .await - .expect("failed to load services"); + .context("failed to load services")?; *data.write().await = new_data; etag = new_etag; crawler .update_crawl() .await - .expect("failed to update crawl"); + .context("failed to update crawl")?; } } } } } +async fn reload_services_wrapper( + source: ServicesSource, + config: Arc, + crawler: Arc, + data: Arc>, +) { + if !config.auto_updater.enabled { + debug!("Auto updater is disabled"); + return; + } + let mut restart_counter = 0; + loop { + if let Err(e) = + reload_services(&source, config.clone(), crawler.clone(), data.clone()).await + { + error!("Failed to reload services: {}", e); + restart_counter += 1; + } + let restart_in = 60 * restart_counter; + error!("Reload services failed, retrying in {}", restart_in); + tokio::time::sleep(std::time::Duration::from_secs(restart_in)).await; + } +} + #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); @@ -226,7 +249,18 @@ async fn main() -> Result<()> { &services .clone() .or(config.services.clone()) - .unwrap_or_else(|| String::from("services.json")), + .unwrap_or_else(|| { + // If services.json exists in the current directory, use it. + if PathBuf::from("services.json").is_file() { + debug!("Using services.json in the current directory"); + return String::from("services.json"); + } + // Otherwise, use the default services source. + debug!("Using default services source"); + String::from( + "https://raw.githubusercontent.com/cofob/fastside/master/services.json", + ) + }), )?; debug!("Using services source: {:?}", services_source); @@ -266,7 +300,7 @@ async fn main() -> Result<()> { let cloned_crawler = crawler.clone(); let crawler_loop_handle = tokio::spawn(crawler_loop(cloned_crawler)); - let reload_services_handle = tokio::spawn(reload_services( + let reload_services_handle = tokio::spawn(reload_services_wrapper( services_source, config.clone(), crawler.clone(),