Skip to content

Commit

Permalink
Clean up error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
JustinTimperio committed Sep 7, 2024
1 parent c83c2d8 commit 1ea0ccd
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 212 deletions.
26 changes: 23 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ path = "src/lib.rs"
[dependencies]
bincode = "1.3.3"
chrono = { version = "0.4.38", features = ["serde"] }
csv = "1.3.0"
rand = "0.8.4"
redb = "2.1.1"
serde = { version = "1.0.208", features = ["derive"] }
serde_with = { version = "3.9.0", features = ["chrono"] }
thiserror = "1.0.63"
tokio = { version = "1.10.0", features = ["full"] }
uuid = { version = "1.10.0", features = ["v4"] }

Expand Down
10 changes: 3 additions & 7 deletions graphs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,9 @@ async fn bench(
_ = async {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let results = rpq_clone.prioritize().await;

if !results.is_err() {
let (r, e) = results.unwrap();
removed_clone.fetch_add(r, Ordering::SeqCst);
escalated_clone.fetch_add(e, Ordering::SeqCst);
}
let (r, e) = rpq_clone.prioritize().await;
removed_clone.fetch_add(r, Ordering::SeqCst);
escalated_clone.fetch_add(e, Ordering::SeqCst);
}
} => {}
}
Expand Down
95 changes: 33 additions & 62 deletions src/disk.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::error::Error;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::vec::Vec;

use redb::{Database, ReadableTableMetadata, TableDefinition};
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::errors::DiskError;
use crate::schema;

pub struct DiskCache<T: Clone + Send> {
Expand All @@ -20,21 +18,26 @@ impl<T: Clone + Send> DiskCache<T>
where
T: Serialize + DeserializeOwned,
{
pub fn new(path: &str) -> DiskCache<T> {
let db = Database::create(path).unwrap();

// Create the initial table
let ctxn = db.begin_write().unwrap();
ctxn.open_table(DB).unwrap();
ctxn.commit().unwrap();

DiskCache {
db,
phantom: std::marker::PhantomData,
pub fn new(path: &str) -> Result<DiskCache<T>, DiskError> {
let db = Database::create(path);

match db {
Ok(db) => {
// Create the initial table
let ctxn = db.begin_write().unwrap();
ctxn.open_table(DB).unwrap();
ctxn.commit().unwrap();

Ok(DiskCache {
db,
phantom: std::marker::PhantomData,
})
}
Err(e) => Err(DiskError::DatabaseError(e)),
}
}

pub fn commit_batch(&self, write_cache: &mut Vec<schema::Item<T>>) -> Result<(), Box<dyn Error>>
pub fn commit_batch(&self, write_cache: &mut Vec<schema::Item<T>>) -> Result<(), DiskError>
where
T: Serialize + DeserializeOwned,
{
Expand All @@ -43,21 +46,15 @@ where
let mut table = write_txn.open_table(DB).unwrap();
let b = item.to_bytes();
if b.is_err() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error converting item to bytes",
)));
return Err(DiskError::ItemSerdeError(b.err().unwrap()));
}

let b = b.unwrap();
let key = item.get_disk_uuid().unwrap();

let was_written = table.insert(key.as_str(), &b[..]);
if was_written.is_err() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error writing item to disk cache",
)));
return Err(DiskError::StorageError(was_written.err().unwrap()));
}
}

Expand All @@ -66,10 +63,7 @@ where
Ok(())
}

pub fn delete_batch(
&self,
delete_cache: &mut Vec<schema::Item<T>>,
) -> Result<(), Box<dyn Error>>
pub fn delete_batch(&self, delete_cache: &mut Vec<schema::Item<T>>) -> Result<(), DiskError>
where
T: Serialize + DeserializeOwned,
{
Expand All @@ -79,10 +73,7 @@ where
let key = item.get_disk_uuid().unwrap();
let was_deleted = table.remove(key.as_str());
if was_deleted.is_err() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error deleting item from disk cache",
)));
return Err(DiskError::StorageError(was_deleted.err().unwrap()));
}
}
write_txn.commit().unwrap();
Expand All @@ -91,45 +82,35 @@ where
Ok(())
}

pub fn commit_single(&self, item: schema::Item<T>) -> Result<(), Box<dyn Error>>
pub fn commit_single(&self, item: schema::Item<T>) -> Result<(), DiskError>
where
T: Serialize + DeserializeOwned,
{
let write_txn = self.db.begin_write().unwrap();
{
let mut table = write_txn.open_table(DB).unwrap();
let b = item.to_bytes();

if b.is_err() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error converting item to bytes",
)));
return Err(DiskError::ItemSerdeError(b.err().unwrap()));
}
let b = b.unwrap();

let disk_uuid = item.get_disk_uuid();
if disk_uuid.is_none() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error getting disk uuid",
)));
return Err(DiskError::DiskUuidError);
}

let b = b.unwrap();
let was_written = table.insert(disk_uuid.unwrap().as_str(), &b[..]);
if was_written.is_err() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error writing item to disk cache",
)));
return Err(DiskError::StorageError(was_written.err().unwrap()));
}
}
write_txn.commit().unwrap();

Ok(())
}

pub fn delete_single(&self, key: &str) -> Result<(), Box<dyn Error>>
pub fn delete_single(&self, key: &str) -> Result<(), DiskError>
where
T: Serialize + DeserializeOwned,
{
Expand All @@ -138,18 +119,15 @@ where
let mut table = write_txn.open_table(DB).unwrap();
let was_removed = table.remove(key);
if was_removed.is_err() {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error deleting item from disk cache",
)));
return Err(DiskError::StorageError(was_removed.err().unwrap()));
}
}
write_txn.commit().unwrap();

Ok(())
}

pub fn return_items_from_disk(&self) -> Result<Vec<schema::Item<T>>, Box<dyn Error>>
pub fn return_items_from_disk(&self) -> Result<Vec<schema::Item<T>>, DiskError>
where
T: Serialize + DeserializeOwned,
{
Expand All @@ -159,23 +137,16 @@ where

let cursor = match table.range::<&str>(..) {
Ok(range) => range,
Err(e) => {
return Err(Box::<dyn Error>::from(e));
}
Err(e) => return Err(DiskError::StorageError(e)),
};

// Restore the items from the disk cache
for entry in cursor {
match entry {
Ok((_key, value)) => {
let item = schema::Item::from_bytes(value.value());

if item.is_err() {
println!("Error reading from disk cache: {:?}", item.err());
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error reading from disk cache",
)));
return Err(DiskError::ItemSerdeError(item.err().unwrap()));
}

// Mark the item as restored
Expand All @@ -184,7 +155,7 @@ where
items.push(i);
}
Err(e) => {
return Err(Box::<dyn Error>::from(e));
return Err(DiskError::StorageError(e));
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use redb::{DatabaseError, StorageError};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum RPQError {
#[error("Disk error")]
DiskError(DiskError),
#[error("Item error")]
ItemError(ItemError),

#[error("Error sending to channel")]
ChannelSendError,

#[error("Error receiving from channel")]
ChannelRecvError,
}

#[derive(Error, Debug)]
pub enum DiskError {
#[error("Database error")]
DatabaseError(#[from] DatabaseError),

#[error("No disk uuid was set")]
DiskUuidError,

#[error("Error on the disk cache")]
StorageError(#[from] StorageError),

#[error("Error de/serializing item")]
ItemSerdeError(#[from] ItemError),

#[error("Disk cache not initialized")]
DiskCacheNotInitialized,
}

#[derive(Error, Debug)]
pub enum ItemError {
#[error("Error de/serializing item")]
ItemSerdeError(#[from] bincode::Error),

#[error("Empty byte array during deserialization")]
EmptyByteArray,
}
Loading

0 comments on commit 1ea0ccd

Please sign in to comment.