Skip to content

Commit

Permalink
chore(dev): gridfs storage default to handle large taxa model fields
Browse files Browse the repository at this point in the history
  • Loading branch information
esteinig committed Mar 5, 2025
1 parent 79cff02 commit e4cfa4d
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 57 deletions.
2 changes: 1 addition & 1 deletion cerebro/stack/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl CerebroClient {
team,
db,
project,
max_model_size: Some(16.0)
max_model_size: Some(100.0)
})
}
pub fn log_team_warning(&self) {
Expand Down
2 changes: 1 addition & 1 deletion cerebro/stack/pipe/src/taxa/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub fn apply_filters(mut taxa: Vec<Taxon>, filter_config: &TaxonFilterConfig, sa
})
.for_each(|record| {
if let Some(tags) = sample_tags.get(&record.id) {
// Extract nucleic acid tag (DNA/RNA)for NTC record if present
// Extract nucleic acid tag (DNA/RNA) for NTC record if present
if let Some(tag) = tags.iter().find(|tag| tag == &&"DNA".to_string() || tag == &&"RNA".to_string()) {
let key = (record.tool.clone(), record.mode.clone(), tag.clone());
// Sum RPM values for matching keys
Expand Down
67 changes: 67 additions & 0 deletions cerebro/stack/server/src/api/cerebro/gridfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::collections::HashMap;

use cerebro_pipe::taxa::taxon::Taxon;
use futures::io::{AsyncWriteExt, AsyncReadExt};

use mongodb::gridfs::GridFsBucket;


/// Uploads the taxa HashMap to GridFS as a JSON-encoded file.
///
/// # Arguments
/// * `bucket` - A GridFsBucket instance to perform the upload.
/// * `taxa` - A reference to the taxa HashMap to be stored.
/// * `filename` - A string slice for the filename to be stored in GridFS.
///
/// # Returns
/// * On success, returns the GridFS file’s ObjectId.
///
/// **Note:** In v3 you open an upload stream, write the data to it, and then close the stream
/// to finalize the upload.
pub async fn upload_taxa_to_gridfs(
bucket: GridFsBucket,
taxa: &HashMap<String, Taxon>,
filename: &str,
) -> Result<String, Box<dyn std::error::Error>> {

// Serialize taxa to JSON bytes.
let taxa_bytes = serde_json::to_vec(taxa)?;

// Open an upload stream for the given filename.
let mut upload_stream = bucket.open_upload_stream(filename).await?;

// Write all the bytes to the upload stream.
upload_stream.write_all(&taxa_bytes).await?;

// Close the stream to finalize the upload
upload_stream.close().await?;

Ok(filename.to_string())
}

/// Downloads the taxa HashMap from GridFS using the provided ObjectId.
///
/// # Arguments
/// * `bucket` - A GridFsBucket instance to perform the download.
/// * `taxa_id` - The ObjectId referencing the stored taxa data in GridFS.
///
/// # Returns
/// * On success, returns the taxa HashMap.
///
/// **Note:** open_download_stream returns a GridFsDownloadStream which implements Stream over
/// chunks of data.
pub async fn download_taxa_from_gridfs(
bucket: GridFsBucket,
filename: &str,
) -> Result<HashMap<String, Taxon>, Box<dyn std::error::Error>> {
// Open the download stream using the taxa_id.
let mut download_stream = bucket.open_download_stream_by_name(filename).await?;

let mut bytes: Vec<u8> = Vec::new();
let _ = download_stream.read_to_end(&mut bytes).await?;

// Deserialize the bytes back into a HashMap.
let taxa: HashMap<String, Taxon> = serde_json::from_slice(&bytes)?;

Ok(taxa)
}
166 changes: 121 additions & 45 deletions cerebro/stack/server/src/api/cerebro/handler.rs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion cerebro/stack/server/src/api/cerebro/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Cerebro module for all sample data related models and route handlers
pub mod mongo;
pub mod handler;
pub mod handler;
pub mod gridfs;
1 change: 1 addition & 0 deletions cerebro/stack/server/src/api/cerebro/mongo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub fn get_matched_id_taxa_cerebro_pipeline(
doc! { "$match": match_conditions },
doc! {
"$project": {
"id": 1,
"taxa": 1,
"name": 1,
"sample_tags": "$sample.tags",
Expand Down
2 changes: 1 addition & 1 deletion cerebro/stack/server/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ MAIN ASYNC LAUNCH THROUCH CLI

/// Main application configuration
fn app_config(cfg: &mut web::ServiceConfig) {
let json_cfg = web::JsonConfig::default().limit(16 * 1024 * 1024); // 16 MB transfer limit for MongoDB model size limits
let json_cfg = web::JsonConfig::default().limit(100 * 1024 * 1024); // 100 MB transfer limit for MongoDB model size limits (large size usually due to taxa which are now stored in GridFS)
cfg.app_data(json_cfg);

}
Expand Down
10 changes: 2 additions & 8 deletions cerebro/stack/server/src/api/teams/handler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@



use cerebro_model::api::users::model::{UserId, User, Role};
use cerebro_model::api::users::model::{UserId, User};
use cerebro_model::api::teams::model::{TeamId, Team, TeamDatabase, ProjectCollection};
use cerebro_model::api::teams::schema::{RegisterTeamSchema, RegisterDatabaseSchema, RegisterProjectSchema, UpdateTeamSchema};
use cerebro_model::api::utils::AdminCollection;

use mongodb::options::BulkWriteOptions;

use crate::api::auth::jwt::{self, TeamAccessQuery, TeamDatabaseAccessQuery};
use crate::api::server::AppState;
use crate::api::utils::get_cerebro_db_collection;
Expand Down Expand Up @@ -199,10 +197,6 @@ async fn register_team_database_project_handler(
Err(err) => return err
};

let project_update_options = mongodb::options::UpdateOptions::builder().array_filters(
vec![doc! { "db.id": &database.id }]
).build();

let new_project = ProjectCollection::from_project_schema(&body.into_inner());

// Build an update model that includes your array filters.
Expand All @@ -216,7 +210,7 @@ async fn register_team_database_project_handler(

// Execute the bulk write with a single update model.
match data.db.bulk_write(vec![update_model]).await {
Ok(result) => {
Ok(_) => {
let json_response = serde_json::json!({
"status": "success",
"message": "Added project to team database",
Expand Down

0 comments on commit e4cfa4d

Please sign in to comment.