From 17bbbcb44d8cd4331449313fea93ad8efdf73306 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Tue, 30 Jul 2024 11:12:26 -0600 Subject: [PATCH] rsc: Allow config for chunking blob eviction + no sleep until caught up (#1610) --- rust/rsc/src/bin/rsc/main.rs | 46 ++++++++++++++++++++---------------- rust/rsc/src/database.rs | 3 ++- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/rust/rsc/src/bin/rsc/main.rs b/rust/rsc/src/bin/rsc/main.rs index cdf23c36c..9785b865e 100644 --- a/rust/rsc/src/bin/rsc/main.rs +++ b/rust/rsc/src/bin/rsc/main.rs @@ -227,33 +227,41 @@ fn launch_job_eviction(conn: Arc, tick_interval: u64, ttl: u fn launch_blob_eviction( conn: Arc, - tick_interval: u64, - setup_ttl: u64, + config: Arc, blob_stores: HashMap>, ) { - // TODO: This should probably be a transaction so that a job can't add a new reference to a - // blob as we are deleting it. - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(tick_interval)); + let mut interval = + tokio::time::interval(Duration::from_secs(config.blob_eviction.tick_rate)); + let mut should_sleep = false; loop { - interval.tick().await; tracing::info!("Blob TTL eviction tick"); + if should_sleep { + interval.tick().await; + } // Blobs must be at least this old to be considered for eviction. // This gives clients time to reference a blob before it gets evicted. - let setup_ttl = (Utc::now() - Duration::from_secs(setup_ttl)).naive_utc(); + let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc(); - let blobs = match database::read_unreferenced_blobs(conn.as_ref(), setup_ttl).await { + let blobs = match database::read_unreferenced_blobs( + conn.as_ref(), + ttl, + config.blob_eviction.chunk_size, + ) + .await + { Ok(b) => b, Err(err) => { tracing::error!(%err, "Failed to fetch blobs for eviction"); + should_sleep = true; continue; // Try again on the next tick } }; let blob_ids: Vec = blobs.iter().map(|blob| blob.id).collect(); let eligible = blob_ids.len(); + should_sleep = eligible == 0; tracing::info!(%eligible, "At least N blobs eligible for eviction"); @@ -262,6 +270,7 @@ fn launch_blob_eviction( Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"), Err(err) => { tracing::error!(%err, "Failed to delete blobs from db for eviction"); + should_sleep = true; continue; // Try again on the next tick } }; @@ -317,11 +326,6 @@ async fn main() -> Result<(), Box> { let config = config::RSCConfig::new()?; let config = Arc::new(config); - if args.show_config { - println!("{}", serde_json::to_string_pretty(&config).unwrap()); - return Ok(()); - } - // setup a subscriber for logging let _guard = if let Some(log_directory) = config.log_directory.clone() { let file_appender = tracing_appender::rolling::daily(log_directory, "rsc.log"); @@ -334,6 +338,13 @@ async fn main() -> Result<(), Box> { None }; + let config_json = serde_json::to_string_pretty(&config).unwrap(); + if args.show_config { + println!("{}", config_json); + return Ok(()); + } + tracing::info!(%config_json, "Launching RSC with config"); + // Increase the number of allowed open files the the max request_max_fileno_limit(); @@ -352,12 +363,7 @@ async fn main() -> Result<(), Box> { config::RSCJobEvictionConfig::LRU(_) => panic!("LRU not implemented"), } - launch_blob_eviction( - connection.clone(), - config.blob_eviction.tick_rate, - config.blob_eviction.ttl, - stores.clone(), - ); + launch_blob_eviction(connection.clone(), config.clone(), stores.clone()); // Launch the server let router = create_router(connection.clone(), config.clone(), &stores); diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index da7d18e51..cd3c12671 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -626,6 +626,7 @@ pub async fn upsert_blob( pub async fn read_unreferenced_blobs( db: &T, ttl: NaiveDateTime, + chunk: u32, ) -> Result, DbErr> { // Limit = 16k as the query is also subject to parameter max. // Blob has 4 params so (2^16)/4 = 16384. Also generally best to chunk blob eviction @@ -648,7 +649,7 @@ pub async fn read_unreferenced_blobs( ) LIMIT $2 "#, - [ttl.into(), (MAX_SQLX_PARAMS / 4).into()], + [ttl.into(), chunk.into()], )) .all(db) .await