Skip to content

Commit

Permalink
Integration test for CachingSession
Browse files Browse the repository at this point in the history
Adds the integration test `caching_session::ensure_cache_is_used`
for `CachingSession`. The test sends through the `scylla_proxy` several queries
and verifies that the number of prepare requests is exactly as expected.
If the cache is working correctly no extra prepare requests should appear.
If it's not, the assertions will bring attention to that.
  • Loading branch information
Bouncheck committed Feb 12, 2025
1 parent a253772 commit d6f9285
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 0 deletions.
140 changes: 140 additions & 0 deletions scylla/tests/integration/caching_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::num::NonZero;
use std::sync::Arc;

use crate::utils::test_with_3_node_cluster;
use crate::utils::{setup_tracing, unique_keyspace_name, PerformDDL};
use scylla::batch::Batch;
use scylla::batch::BatchType;
use scylla::client::caching_session::CachingSession;
use scylla_proxy::RequestOpcode;
use scylla_proxy::RequestReaction;
use scylla_proxy::RequestRule;
use scylla_proxy::ShardAwareness;
use scylla_proxy::{Condition, ProxyError, Reaction, RequestFrame, TargetShard, WorkerError};
use tokio::sync::mpsc;

fn consume_current_feedbacks(
rx: &mut mpsc::UnboundedReceiver<(RequestFrame, Option<TargetShard>)>,
) -> usize {
std::iter::from_fn(|| rx.try_recv().ok()).count()
}

#[tokio::test]
#[cfg(not(scylla_cloud_tests))]
async fn ensure_cache_is_used() {
setup_tracing();
let res = test_with_3_node_cluster(
ShardAwareness::QueryNode,
|proxy_uris, translation_map, mut running_proxy| async move {
let session = scylla::client::session_builder::SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();

let ks = unique_keyspace_name();

let (feedback_txs, mut feedback_rxs): (Vec<_>, Vec<_>) = (0..3)
.map(|_| mpsc::unbounded_channel::<(RequestFrame, Option<TargetShard>)>())
.unzip();
for (i, tx) in feedback_txs.iter().cloned().enumerate() {
running_proxy.running_nodes[i].change_request_rules(Some(vec![RequestRule(
Condition::and(Condition::RequestOpcode(RequestOpcode::Prepare),Condition::not(Condition::ConnectionRegisteredAnyEvent)),
RequestReaction::noop().with_feedback_when_performed(tx),
)]));
}


session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))")
.await
.unwrap();
// Assumption: all nodes have the same number of shards
let nr_shards = match session.get_cluster_state().get_nodes_info().first() {
Some(node_info) => match node_info.sharder() {
Some(sharder) => sharder.nr_shards,
None => NonZero::new(1).unwrap(), // If there is no sharder, assume 1 shard
},
None => panic!("No nodes information available"),
};

// Consume all feedbacks so far to ensure we will not count something unrelated.
let _feedbacks: usize = feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();

let caching_session: CachingSession = CachingSession::from(session, 100);
let insert_1_b_c = "INSERT INTO tab (a, b, c) VALUES (1, ?, ?)";
let insert_2_b_c = "INSERT INTO tab (a, b, c) VALUES (2, ?, ?)";
let insert_3_b_c = "INSERT INTO tab (a, b, c) VALUES (3, ?, ?)";
let insert_4_b_c = "INSERT INTO tab (a, b, c) VALUES (4, ?, ?)";
let mut batch = Batch::new(BatchType::Logged);
batch.append_statement(insert_1_b_c);
batch.append_statement(insert_2_b_c);
batch.append_statement(insert_3_b_c);
batch.append_statement(insert_4_b_c);
// First batch that should generate prepares for each shard.
let batch_values = vec![(1, 1), (2, 2), (3, 3), (4, 4)];
caching_session.batch(&batch, batch_values.clone()).await.unwrap();
// Few extra runs. Those batches should not result in any prepares being sent.
caching_session.batch(&batch, batch_values.clone()).await.unwrap();
caching_session.batch(&batch, batch_values.clone()).await.unwrap();
caching_session.batch(&batch, batch_values.clone()).await.unwrap();
caching_session.batch(&batch, batch_values.clone()).await.unwrap();

let prepared_batch_res_rows: Vec<(i32, i32, i32)> = caching_session
.execute_unpaged("SELECT * FROM tab", &[])
.await
.unwrap()
.into_rows_result()
.unwrap()
.rows()
.unwrap()
.map(|r| r.unwrap())
.collect();


// Verify the data from inserts
let mut prepared_batch_res_rows = prepared_batch_res_rows;
prepared_batch_res_rows.sort();
let expected_rows = vec![(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)];
assert_eq!(prepared_batch_res_rows, expected_rows);


let feedbacks: usize = feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();

let expected_feedbacks = (4 + 1) * nr_shards.get() as usize * 3; // 4 inserts + 1 select, 3 nodes
assert_eq!(feedbacks, expected_feedbacks);


// Run some alters to invalidate the server side cache, similarly to scylla/src/session_test.rs
caching_session.ddl("ALTER TABLE tab RENAME c to tmp").await.unwrap();
caching_session.ddl("ALTER TABLE tab RENAME b to c").await.unwrap();
caching_session.ddl("ALTER TABLE tab RENAME tmp to b").await.unwrap();

// execute_unpageds caused by alters likely resulted in some prepares being sent.
// Consume those frames.
feedback_rxs.iter_mut().map(consume_current_feedbacks).sum::<usize>();

// Run batches many times per node to ensure we hit every one of them.
// Server side cache was invalidated, but the reprepare will be required only if
// we hit the node that had its cache invalidated. Only the corresponding shards
// will be targeted, so this time expected number of prepares is not multiplied by nr_shards
for _ in 0..(64*3) {
caching_session.batch(&batch, batch_values.clone()).await.unwrap();
}

let feedbacks: usize = feedback_rxs.iter_mut().map(consume_current_feedbacks).sum();
let expected_feedbacks = 4 * 3; // 4 inserts, 3 nodes
assert_eq!(feedbacks, expected_feedbacks);
running_proxy
},
)
.await;
match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}
1 change: 1 addition & 0 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod authenticate;
mod batch;
mod caching_session;
mod consistency;
mod cql_collections;
mod cql_types;
Expand Down

0 comments on commit d6f9285

Please sign in to comment.