From 73a8c849f1dd5360db2a4f232a28e76a4396323d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 17:16:37 +0530 Subject: [PATCH 1/4] refactor: unnest --- src/{correlation/mod.rs => correlation.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{correlation/mod.rs => correlation.rs} (100%) diff --git a/src/correlation/mod.rs b/src/correlation.rs similarity index 100% rename from src/correlation/mod.rs rename to src/correlation.rs From 4a3883f6aee4f6859ba5e09b73d6b32e27f5ef54 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 17:28:24 +0530 Subject: [PATCH 2/4] refactor: don't branch --- src/correlation.rs | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 9df9c900b..cf2c7bd82 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -54,17 +54,11 @@ impl Correlation { .into_iter() .flat_map(|(_, correlations_bytes)| correlations_bytes) .filter_map(|correlation| { - if correlation.is_empty() { - None - } else { - match serde_json::from_slice(&correlation) { - Ok(correlation_config) => Some(correlation_config), - Err(e) => { - error!("Unable to load correlation: {e}"); - None - } - } - } + serde_json::from_slice(&correlation) + .inspect_err(|e| { + error!("Unable to load correlation: {e}"); + }) + .ok() }) .collect(); @@ -107,13 +101,11 @@ impl Correlation { .find(|c| c.id == correlation_id && c.user_id == user_id) .cloned(); - if let Some(c) = correlation { - Ok(c) - } else { - Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + correlation.ok_or_else(|| { + CorrelationError::AnyhowError(anyhow::Error::msg(format!( "Unable to find correlation with ID- {correlation_id}" - )))) - } + ))) + }) } pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { From 99978cfb04622d1018cd505563fcaf070269a3bc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 17:40:05 +0530 Subject: [PATCH 3/4] don't await collection --- src/storage/localfs.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 0bb2ec12b..5d8041fc2 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -27,7 +27,7 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; -use futures::{stream::FuturesUnordered, TryStreamExt}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; @@ -430,17 +430,16 @@ impl ObjectStorage for LocalFS { ) -> Result>, ObjectStorageError> { let mut correlations: HashMap> = HashMap::new(); let users_root_path = self.root.join(USERS_ROOT_DIR); - let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - let users: Vec = directories.try_collect().await?; - for user in users { + let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); + while let Some(user) = directories.next().await { + let user = user?; if !user.path().is_dir() { continue; } let correlations_path = users_root_path.join(user.path()).join("correlations"); - let directories = ReadDirStream::new(fs::read_dir(&correlations_path).await?); - let correlations_files: Vec = directories.try_collect().await?; - for correlation in correlations_files { - let correlation_absolute_path = correlation.path(); + let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?); + while let Some(correlation) = files.next().await { + let correlation_absolute_path = correlation?.path(); let file = fs::read(correlation_absolute_path.clone()).await?; let correlation_relative_path = correlation_absolute_path .strip_prefix(self.root.as_path()) From 393d3744437a3ac644661425229b429c10eb7554 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 17:58:27 +0530 Subject: [PATCH 4/4] refactor: string construction --- src/storage/azure_blob.rs | 18 ++++++------------ src/storage/s3.rs | 18 ++++++------------ 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index f65484259..6d75c2e5b 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -678,10 +678,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_dashboard_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "dashboards" - )); + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); let dashboards_path = RelativePathBuf::from(&user_dashboard_path); let dashboard_bytes = self .get_objects( @@ -716,10 +714,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_filters_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "filters" - )); + let user_filters_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); let resp = self .client .list_with_delimiter(Some(&user_filters_path)) @@ -767,10 +763,8 @@ impl ObjectStorage for BlobStore { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_correlation_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "correlations" - )); + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations")); let correlations_path = RelativePathBuf::from(&user_correlation_path); let correlation_bytes = self .get_objects( diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 886685667..e115dd0bf 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -811,10 +811,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_dashboard_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "dashboards" - )); + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); let dashboards_path = RelativePathBuf::from(&user_dashboard_path); let dashboard_bytes = self .get_objects( @@ -849,10 +847,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_filters_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "filters" - )); + let user_filters_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); let resp = self .client .list_with_delimiter(Some(&user_filters_path)) @@ -900,10 +896,8 @@ impl ObjectStorage for S3 { .map(|name| name.as_ref().to_string()) .collect::>(); for user in users { - let user_correlation_path = object_store::path::Path::from(format!( - "{}/{}/{}", - USERS_ROOT_DIR, user, "correlations" - )); + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); let correlations_path = RelativePathBuf::from(&user_correlation_path); let correlation_bytes = self .get_objects(