Skip to content

Commit

Permalink
Fix clippy errors and bug hunting
Browse files Browse the repository at this point in the history
  • Loading branch information
JustinTimperio committed Sep 7, 2024
1 parent 4fa6ea4 commit c83c2d8
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 115 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ rpq = "0.2.0"
- `dequeue() -> Result<Item, Error>` - Dequeues the next item from the RPQ.
- `dequeue_batch(count: usize) -> Result<Vec<Item>, Error>` - Dequeues a batch of items from the RPQ.
- `prioritize() -> Result<(usize, usize), Error>` - Prioritizes the items in the RPQ and returns the number of timed out and reprioritized items.
- `len() -> usize` - Returns the number of items in the RPQ.
- `items_in_queue() -> usize` - Returns the number of items in the RPQ.
- `active_buckets() -> usize` - Returns the number of active buckets in the RPQ.
- `unsynced_batches() -> usize` - Returns the number of unsynced batches pending to be commit to disk.
- `items_in_db() -> usize` - Returns the number of items in the RPQ database.
Expand All @@ -83,7 +83,7 @@ async fn main() {
let options = RPQOptions {
max_priority: 10,
disk_cache_enabled: true,
database_path: "/tmp/rpq-prioritize.redb".to_string(),
database_path: "/tmp/rpq.redb".to_string(),
lazy_disk_cache: true,
lazy_disk_write_delay: Duration::seconds(5),
lazy_disk_cache_batch_size: 10_000,
Expand Down
2 changes: 1 addition & 1 deletion src/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
};

// Restore the items from the disk cache
for (_i, entry) in cursor.enumerate() {
for entry in cursor {
match entry {
Ok((_key, value)) => {
let item = schema::Item::from_bytes(value.value());
Expand Down
107 changes: 50 additions & 57 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
//!
//! for _i in 0..message_count {
//! let result = rpq.dequeue().await;
//! if result.is_err() {
//! println!("Error Dequeuing: {}", result.err().unwrap());
//! if result.is_err() {
//! println!("Error Dequeuing: {}", result.err().unwrap());
//! return;
//! }
//! }
//! }
//!
//! rpq.close().await;
//! }
Expand Down Expand Up @@ -236,9 +236,9 @@ where
disk_cache: disk_cache_clone_two,

lazy_disk_sync_handles: Mutex::new(sync_handles),
lazy_disk_writer_sender: lazy_disk_writer_sender,
lazy_disk_writer_sender,
lazy_disk_writer_receiver: Mutex::new(lazy_disk_writer_receiver),
lazy_disk_delete_sender: lazy_disk_delete_sender,
lazy_disk_delete_sender,
lazy_disk_delete_receiver: Mutex::new(lazy_disk_delete_receiver),
lazy_disk_shutdown_receiver: shutdown_receiver,
lazy_disk_shutdown_sender: shutdown_sender,
Expand All @@ -254,7 +254,7 @@ where
if disk_cache_enabled {
let result = rpq.restore_from_disk().await;
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
restored_items = result?;

Expand Down Expand Up @@ -319,7 +319,7 @@ where
match result {
Ok(_) => {}
Err(e) => {
return Err(Box::<dyn Error>::from(e));
return Err(e);
}
}
}
Expand Down Expand Up @@ -377,7 +377,7 @@ where
match result {
Ok(_) => {}
Err(e) => {
return Err(Box::<dyn Error>::from(e));
return Err(e);
}
}
}
Expand Down Expand Up @@ -509,7 +509,7 @@ where
}

/// Returns the number of items in the RPQ across all buckets
pub async fn len(&self) -> usize {
pub async fn items_in_queue(&self) -> usize {
self.queue.lock().await.items_in_queue()
}

Expand Down Expand Up @@ -581,7 +581,7 @@ where
Some(db) => {
let result = db.commit_batch(batch);
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
}
}
Expand All @@ -597,7 +597,7 @@ where
item = receiver.recv() => {
if let Some(item) = item {
let batch_bucket = item.get_batch_id();
let batch = awaiting_batches.entry(batch_bucket).or_insert(Vec::new());
let batch = awaiting_batches.entry(batch_bucket).or_default();
batch.push(item);
}
},
Expand All @@ -609,7 +609,7 @@ where
// Pull the remaining items from the receiver
while let Some(item) = receiver.recv().await {
let batch_bucket = item.get_batch_id();
let batch = awaiting_batches.entry(batch_bucket).or_insert(Vec::new());
let batch = awaiting_batches.entry(batch_bucket).or_default();
batch.push(item);
}

Expand All @@ -636,7 +636,7 @@ where
Some(db) => {
let result = db.commit_batch(batch);
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
}
}
Expand Down Expand Up @@ -675,7 +675,7 @@ where
Some(db) => {
let result = db.delete_batch(&mut restored_items);
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
}
}
Expand All @@ -685,7 +685,7 @@ where

// If the item was not restored, add it to the batch
let batch_bucket = item.get_batch_id();
let mut batch = awaiting_batches.entry(batch_bucket).or_insert(Vec::new());
let batch = awaiting_batches.entry(batch_bucket).or_default();
batch.push(item);

// Check if the batch is full
Expand All @@ -702,9 +702,9 @@ where
)));
}
Some(db) => {
let result = db.delete_batch(&mut batch);
let result = db.delete_batch(batch);
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
}
}
Expand Down Expand Up @@ -734,7 +734,7 @@ where

// If the item was not restored, add it to the batch
let batch_bucket = item.get_batch_id();
let batch = awaiting_batches.entry(batch_bucket).or_insert(Vec::new());
let batch = awaiting_batches.entry(batch_bucket).or_default();
batch.push(item);
}

Expand All @@ -751,12 +751,12 @@ where
Some(db) => {
let result = db.delete_batch(&mut restored_items);
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
}
}
}
for (id, mut batch) in awaiting_batches.iter_mut() {
for (id, batch) in awaiting_batches.iter_mut() {
let mut batch_handler = self.lazy_disk_batch_handler.lock().await;
let was_synced = batch_handler.synced_batches.get(id).unwrap_or(&false);
if *was_synced {
Expand All @@ -769,9 +769,9 @@ where
)));
}
Some(db) => {
let result = db.delete_batch(&mut batch);
let result = db.delete_batch(batch);
if result.is_err() {
return Err(Box::<dyn Error>::from(result.err().unwrap()));
return Err(result.err().unwrap());
}
}
}
Expand All @@ -791,16 +791,14 @@ where
async fn restore_from_disk(&self) -> Result<usize, Box<dyn Error>> {
let db = self.disk_cache.as_ref();
match db {
None => {
return Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error getting disk cache",
)));
}
None => Err(Box::<dyn Error>::from(IoError::new(
ErrorKind::InvalidInput,
"Error getting disk cache",
))),
Some(db) => {
let restored_items = db.return_items_from_disk();
if restored_items.is_err() {
return Err(Box::<dyn Error>::from(restored_items.err().unwrap()));
return Err(restored_items.err().unwrap());
}
let restored_items = restored_items?;
let total_items = restored_items.len();
Expand Down Expand Up @@ -864,7 +862,7 @@ mod tests {
}

rpq.close().await;
assert_eq!(rpq.len().await, 0);
assert_eq!(rpq.items_in_queue().await, 0);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -895,7 +893,6 @@ mod tests {
tokio::spawn(async move {
tokio::select! {
_ = shutdown_receiver.changed() => {
return;
},
_ = async {
loop {
Expand All @@ -919,7 +916,7 @@ mod tests {
i % 10,
i,
true,
Some(Duration::seconds(rand::thread_rng().gen_range(1..10))),
Some(Duration::seconds(rand::thread_rng().gen_range(1..7))),
true,
Some(Duration::seconds(10)),
);
Expand Down Expand Up @@ -947,7 +944,7 @@ mod tests {

shutdown_sender.send(true).unwrap();
rpq.close().await;
assert_eq!(rpq.len().await, 0);
assert_eq!(rpq.items_in_queue().await, 0);
assert_eq!(rpq.items_in_db(), 0);
println!(
"Sent: {}, Received: {}, Removed: {}, Escalated: {}",
Expand Down Expand Up @@ -992,7 +989,7 @@ mod tests {
}

tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
assert!(rpq.len().await == message_count);
assert!(rpq.items_in_queue().await >= message_count);
assert!(rpq.items_in_db() != 0);

for _i in 0..message_count {
Expand All @@ -1005,19 +1002,19 @@ mod tests {
}

rpq.close().await;
assert_eq!(rpq.len().await, 0);
assert_eq!(rpq.items_in_queue().await, 0);
assert_eq!(rpq.items_in_db(), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn e2e_no_batch() {
// Set Message Count
let message_count = 10_000_250 as usize;
let message_count = 10_000_250_usize;

// Set Concurrency
let send_threads = 4 as usize;
let receive_threads = 4 as usize;
let bucket_count = 10 as usize;
let send_threads = 4_usize;
let receive_threads = 4_usize;
let bucket_count = 10_usize;
let sent_counter = Arc::new(AtomicUsize::new(0));
let received_counter = Arc::new(AtomicUsize::new(0));
let removed_counter = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -1045,14 +1042,13 @@ mod tests {
tokio::spawn(async move {
tokio::select! {
_ = shutdown_receiver.changed() => {
return;
},
_ = async {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let results = rpq_clone.prioritize().await;

if !results.is_ok() {
if results.is_err() {
let (removed, escalated) = results.unwrap();
removed_clone.fetch_add(removed, Ordering::SeqCst);
escalated_clone.fetch_add(escalated, Ordering::SeqCst);
Expand Down Expand Up @@ -1107,13 +1103,12 @@ mod tests {
// Spawn the thread
receive_handles.push(tokio::spawn(async move {
loop {
if finished_sending_clone.load(Ordering::SeqCst) {
if received_clone.load(Ordering::SeqCst)
if finished_sending_clone.load(Ordering::SeqCst)
&& received_clone.load(Ordering::SeqCst)
+ removed_clone.load(Ordering::SeqCst)
>= sent_clone.load(Ordering::SeqCst) + restored_items
{
break;
}
{
break;
}

let item = rpq_clone.dequeue().await;
Expand Down Expand Up @@ -1158,12 +1153,12 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn e2e_batch() {
// Set Message Count
let message_count = 10_000_250 as usize;
let message_count = 10_000_250_usize;

// Set Concurrency
let send_threads = 4 as usize;
let receive_threads = 4 as usize;
let bucket_count = 10 as usize;
let send_threads = 4_usize;
let receive_threads = 4_usize;
let bucket_count = 10_usize;
let sent_counter = Arc::new(AtomicUsize::new(0));
let received_counter = Arc::new(AtomicUsize::new(0));
let removed_counter = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -1191,14 +1186,13 @@ mod tests {
tokio::spawn(async move {
tokio::select! {
_ = shutdown_receiver.changed() => {
return;
},
_ = async {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let results = rpq_clone.prioritize().await;

if !results.is_ok() {
if results.is_err() {
let (removed, escalated) = results.unwrap();
removed_clone.fetch_add(removed, Ordering::SeqCst);
escalated_clone.fetch_add(escalated, Ordering::SeqCst);
Expand Down Expand Up @@ -1259,13 +1253,12 @@ mod tests {
// Spawn the thread
receive_handles.push(tokio::spawn(async move {
loop {
if finished_sending_clone.load(Ordering::SeqCst) {
if received_clone.load(Ordering::SeqCst)
if finished_sending_clone.load(Ordering::SeqCst)
&& received_clone.load(Ordering::SeqCst)
+ removed_clone.load(Ordering::SeqCst)
>= sent_clone.load(Ordering::SeqCst) + restored_items
{
break;
}
{
break;
}

let item = rpq_clone.dequeue_batch(1000).await;
Expand Down
Loading

0 comments on commit c83c2d8

Please sign in to comment.