Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config auto-reload #12

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion fastside-shared/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,34 @@ impl UserConfig {
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct AutoUpdaterConfig {
pub enabled: bool,
pub interval: Duration,
}

impl Default for AutoUpdaterConfig {
fn default() -> Self {
Self {
enabled: true,
interval: Duration::from_secs(60),
}
}
}

/// Application configuration.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AppConfig {
#[serde(default)]
pub crawler: CrawlerConfig,
#[serde(default)]
pub auto_updater: AutoUpdaterConfig,
#[serde(default)]
pub proxies: ProxyData,
#[serde(default)]
pub default_user_config: UserConfig,
#[serde(default)]
pub services_path: Option<PathBuf>,
pub services: Option<String>,
}

/// Load application configuration.
Expand Down
116 changes: 101 additions & 15 deletions fastside/src/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
use chrono::{DateTime, Utc};
use reqwest::StatusCode;
use thiserror::Error;
use tokio::{sync::RwLock, time::sleep};
use tokio::{
sync::{Mutex, RwLock},
time::sleep,
};
use url::Url;

use crate::{config::CrawlerConfig, types::LoadedData};
Expand Down Expand Up @@ -84,37 +87,95 @@ pub struct CrawledServices {
pub time: DateTime<Utc>,
}

#[derive(Debug)]
pub enum CrawledData {
CrawledServices(CrawledServices),
InitialLoading,
ReloadingServices {
current: CrawledServices,
new: Option<CrawledServices>,
},
}

impl CrawledData {
pub fn get_services(&self) -> Option<&CrawledServices> {
match self {
Self::CrawledServices(s) => Some(s),
Self::InitialLoading => None,
Self::ReloadingServices { current, .. } => Some(current),
}
}

pub fn is_fetched(&self) -> bool {
self.get_services().is_some()
}

pub fn is_reloading(&self) -> bool {
matches!(self, Self::ReloadingServices { .. })
}

pub fn replace(&mut self, new: CrawledData) {
*self = new;
}

pub fn make_reloading(&mut self) {
let current = match self {
Self::CrawledServices(s) => s.clone(),
_ => return,
};
*self = Self::ReloadingServices { current, new: None };
}

pub fn make_reloaded(&mut self) {
let new = match self {
Self::ReloadingServices { new, .. } => new.take(),
_ => return,
};
if let Some(new) = new {
*self = Self::CrawledServices(new);
}
}
}

impl AsRef<CrawledData> for CrawledData {
fn as_ref(&self) -> &CrawledData {
self
}
}

#[derive(Debug)]
pub struct Crawler {
loaded_data: Arc<LoadedData>,
loaded_data: Arc<RwLock<LoadedData>>,
config: Arc<CrawlerConfig>,
data: RwLock<Option<CrawledServices>>,
data: RwLock<CrawledData>,
crawler_lock: Mutex<()>,
}

impl Crawler {
pub fn new(loaded_data: Arc<LoadedData>, config: CrawlerConfig) -> Self {
pub fn new(loaded_data: Arc<RwLock<LoadedData>>, config: CrawlerConfig) -> Self {
Self {
loaded_data,
config: Arc::new(config),
data: RwLock::new(None),
data: RwLock::new(CrawledData::InitialLoading),
crawler_lock: Mutex::new(()),
}
}

#[inline]
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<Option<CrawledServices>> {
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<CrawledData> {
self.data.read().await
}

async fn crawl_single_instance(
config: Arc<CrawlerConfig>,
loaded_data: Arc<LoadedData>,
loaded_data: Arc<RwLock<LoadedData>>,
service: Arc<Service>,
instance: Instance,
) -> Result<(CrawledInstance, String), CrawlerError> {
let client = build_client(
service.as_ref(),
config.as_ref(),
&loaded_data.proxies,
&loaded_data.read().await.proxies,
&instance,
)?;

Expand Down Expand Up @@ -165,8 +226,15 @@ impl Crawler {
}

async fn crawl(&self) -> Result<(), CrawlerError> {
let Ok(crawler_guard) = self.crawler_lock.try_lock() else {
warn!("Crawler lock is already acquired, skipping crawl");
return Ok(());
};

let mut crawled_services: HashMap<String, CrawledService> = self
.loaded_data
.read()
.await
.services
.keys()
.map(|name| {
Expand All @@ -181,7 +249,7 @@ impl Crawler {
.collect();
let mut parallelise = Parallelise::with_capacity(self.config.max_concurrent_requests);

for service in self.loaded_data.services.values() {
for service in self.loaded_data.read().await.services.values() {
let service = Arc::new(service.clone());
for instance in &service.instances {
let loaded_data = self.loaded_data.clone();
Expand All @@ -190,7 +258,7 @@ impl Crawler {
parallelise
.push(tokio::spawn(Self::crawl_single_instance(
config,
loaded_data,
loaded_data.clone(),
service.clone(),
instance,
)))
Expand All @@ -216,17 +284,35 @@ impl Crawler {
}

let mut data = self.data.write().await;
if data.is_none() {
info!("Finished initial crawl, we are ready to serve requests");
}
data.replace(CrawledServices {
data.replace(CrawledData::CrawledServices(CrawledServices {
services: crawled_services,
time: Utc::now(),
});
}));

match data.as_ref() {
CrawledData::ReloadingServices { .. } => {
info!("Finished reloading services");
}
CrawledData::InitialLoading => {
info!("Finished initial crawl, we are ready to serve requests");
}
CrawledData::CrawledServices(_) => {
info!("Finished crawl");
}
}

drop(crawler_guard);
Ok(())
}

/// Run crawler instantly in update loaded_data mode.
pub async fn update_crawl(&self) -> Result<(), CrawlerError> {
let mut data = self.data.write().await;
data.make_reloading();
drop(data);
self.crawl().await
}

pub async fn crawler_loop(&self) {
loop {
debug!("Starting crawl");
Expand Down
Loading
Loading