Skip to content

Commit

Permalink
rsc: Allow config for chunking blob eviction + no sleep until caught …
Browse files Browse the repository at this point in the history
…up (#1610)
  • Loading branch information
V-FEXrt authored Jul 30, 2024
1 parent 839625f commit 17bbbcb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
46 changes: 26 additions & 20 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,33 +227,41 @@ fn launch_job_eviction(conn: Arc<DatabaseConnection>, tick_interval: u64, ttl: u

fn launch_blob_eviction(
conn: Arc<DatabaseConnection>,
tick_interval: u64,
setup_ttl: u64,
config: Arc<config::RSCConfig>,
blob_stores: HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) {
// TODO: This should probably be a transaction so that a job can't add a new reference to a
// blob as we are deleting it.

tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(tick_interval));
let mut interval =
tokio::time::interval(Duration::from_secs(config.blob_eviction.tick_rate));
let mut should_sleep = false;
loop {
interval.tick().await;
tracing::info!("Blob TTL eviction tick");
if should_sleep {
interval.tick().await;
}

// Blobs must be at least this old to be considered for eviction.
// This gives clients time to reference a blob before it gets evicted.
let setup_ttl = (Utc::now() - Duration::from_secs(setup_ttl)).naive_utc();
let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc();

let blobs = match database::read_unreferenced_blobs(conn.as_ref(), setup_ttl).await {
let blobs = match database::read_unreferenced_blobs(
conn.as_ref(),
ttl,
config.blob_eviction.chunk_size,
)
.await
{
Ok(b) => b,
Err(err) => {
tracing::error!(%err, "Failed to fetch blobs for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

let blob_ids: Vec<Uuid> = blobs.iter().map(|blob| blob.id).collect();
let eligible = blob_ids.len();
should_sleep = eligible == 0;

tracing::info!(%eligible, "At least N blobs eligible for eviction");

Expand All @@ -262,6 +270,7 @@ fn launch_blob_eviction(
Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"),
Err(err) => {
tracing::error!(%err, "Failed to delete blobs from db for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};
Expand Down Expand Up @@ -317,11 +326,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = config::RSCConfig::new()?;
let config = Arc::new(config);

if args.show_config {
println!("{}", serde_json::to_string_pretty(&config).unwrap());
return Ok(());
}

// setup a subscriber for logging
let _guard = if let Some(log_directory) = config.log_directory.clone() {
let file_appender = tracing_appender::rolling::daily(log_directory, "rsc.log");
Expand All @@ -334,6 +338,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None
};

let config_json = serde_json::to_string_pretty(&config).unwrap();
if args.show_config {
println!("{}", config_json);
return Ok(());
}
tracing::info!(%config_json, "Launching RSC with config");

// Increase the number of allowed open files the the max
request_max_fileno_limit();

Expand All @@ -352,12 +363,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
config::RSCJobEvictionConfig::LRU(_) => panic!("LRU not implemented"),
}

launch_blob_eviction(
connection.clone(),
config.blob_eviction.tick_rate,
config.blob_eviction.ttl,
stores.clone(),
);
launch_blob_eviction(connection.clone(), config.clone(), stores.clone());

// Launch the server
let router = create_router(connection.clone(), config.clone(), &stores);
Expand Down
3 changes: 2 additions & 1 deletion rust/rsc/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ pub async fn upsert_blob<T: ConnectionTrait>(
pub async fn read_unreferenced_blobs<T: ConnectionTrait>(
db: &T,
ttl: NaiveDateTime,
chunk: u32,
) -> Result<Vec<blob::Model>, DbErr> {
// Limit = 16k as the query is also subject to parameter max.
// Blob has 4 params so (2^16)/4 = 16384. Also generally best to chunk blob eviction
Expand All @@ -648,7 +649,7 @@ pub async fn read_unreferenced_blobs<T: ConnectionTrait>(
)
LIMIT $2
"#,
[ttl.into(), (MAX_SQLX_PARAMS / 4).into()],
[ttl.into(), chunk.into()],
))
.all(db)
.await
Expand Down

0 comments on commit 17bbbcb

Please sign in to comment.