Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Malusev <dusan.malusev@scylladb.com>
  • Loading branch information
CodeLieutenant committed Feb 10, 2025
1 parent 86efc40 commit 1764bfe
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 51 deletions.
6 changes: 6 additions & 0 deletions scylla/tests/ccm_integration/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod new_session;
#[path = "../common/retry_policy.rs"]
mod retry_policy;

#[path = "../common/utils.rs"]
mod utils;
165 changes: 165 additions & 0 deletions scylla/tests/ccm_integration/new_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::sync::Arc;
use assert_matches::assert_matches;
use scylla::client::execution_profile::ExecutionProfile;
use scylla::client::session_builder::SessionBuilder;
use scylla::errors::{ConnectionError, ConnectionPoolError, ExecutionError, NewSessionError};
use scylla::policies::retry::DefaultRetryPolicy;

use crate::retry_policy::NoRetryPolicy;
use crate::utils::setup_tracing;

fn get_scylla() -> (String, String, String) {
let uri1 = std::env::var("SCYLLA_URI1").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let uri2 = std::env::var("SCYLLA_URI2").unwrap_or_else(|_| "127.0.0.2:9042".to_string());
let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string());

(uri1, uri2, uri3)
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn proceed_if_only_some_hostnames_are_invalid() {
setup_tracing();
// on purpose left without port
let uri1 = "scylladbisthefastestdb.invalid".to_owned();
// correctly provided port, but unknown domain
let uri2 = "cassandrasuckssomuch.invalid:9042".to_owned();
let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string());

let session = SessionBuilder::new()
.known_nodes([uri1, uri2, uri3])
.build()
.await
.unwrap();

assert!(session
.query_unpaged("SELECT host_id FROM system.local", &[])
.await
.is_ok());
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn all_hostnames_invalid() {
setup_tracing();
let uri = "cassandrasuckssomuch.invalid:9042".to_owned();

assert_matches!(
SessionBuilder::new().known_node(uri).build().await,
Err(NewSessionError::FailedToResolveAnyHostname(_))
);
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn no_nodes_available_reconnection_enabled() {
setup_tracing();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(DefaultRetryPolicy::default()))
.build();

assert!(SessionBuilder::new()
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.is_ok());
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn no_nodes_available_reconnection_disabled() {
setup_tracing();
// TODO: Replace with CCM
let (uri1, uri2, uri3) = get_scylla();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(NoRetryPolicy))
.build();

assert_matches!(
SessionBuilder::new()
.known_nodes([uri1, uri2, uri3])
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await,
Err(NewSessionError::FailedToResolveAnyHostname(_))
);
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn no_nodes_available_reconnection_enabled_nodes_coming_back() {
setup_tracing();
// TODO: Setup CCM

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(DefaultRetryPolicy::default()))
.build();

assert!(SessionBuilder::new()
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.is_ok());
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn session_created_nodes_away_reconnection_enabled() {
setup_tracing();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(DefaultRetryPolicy::default()))
.build();

let _session = SessionBuilder::new()
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.unwrap();

assert!(true);
}

#[cfg(not(scylla_cloud_tests))]
#[tokio::test]
async fn session_created_nodes_away_reconnection_disabled() {
setup_tracing();

// TODO: Replace with CCM
let (uri1, uri2, uri3) = get_scylla();

let execution_profile = ExecutionProfile::builder()
.retry_policy(Arc::new(NoRetryPolicy))
.build();

let session = SessionBuilder::new()
.known_nodes([uri1, uri2, uri3])
.default_execution_profile_handle(execution_profile.into_handle())
.build()
.await
.unwrap();

// TODO: Everything should be fine
assert!(session
.query_unpaged("SELECT host_id FROM system.local", &[])
.await
.is_ok());

// TODO: Stop the nodes

// TODO: Check the connection -> fails to execute query
assert_matches!(
session
.query_unpaged("SELECT host_id FROM system.local", &[])
.await,
Err(ExecutionError::ConnectionPoolError(
ConnectionPoolError::Broken {
last_connection_error: ConnectionError::BrokenConnection(_),
}
))
);

assert!(true);
}
23 changes: 23 additions & 0 deletions scylla/tests/common/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use scylla::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use std::fmt::Debug;

#[derive(Debug, Clone)]
pub(crate) struct NoRetryPolicy;

#[derive(Debug, Clone)]
pub(crate) struct NoRetrySession;


impl RetryPolicy for NoRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(NoRetrySession)
}
}

impl RetrySession for NoRetrySession {
fn decide_should_retry(&mut self, _request_info: RequestInfo) -> RetryDecision {
RetryDecision::DontRetry
}

fn reset(&mut self) {}
}
23 changes: 12 additions & 11 deletions scylla/tests/integration/utils.rs → scylla/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::{error, warn};

use scylla_proxy::{Node, Proxy, ProxyError, RunningProxy, ShardAwareness};

pub(crate) fn setup_tracing() {
pub fn setup_tracing() {
let _ = tracing_subscriber::fmt::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_writer(tracing_subscriber::fmt::TestWriter::new())
Expand All @@ -31,7 +31,7 @@ pub(crate) fn setup_tracing() {

static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0);

pub(crate) fn unique_keyspace_name() -> String {
pub(super) fn unique_keyspace_name() -> String {
let cnt = UNIQUE_COUNTER.fetch_add(1, Ordering::SeqCst);
let name = format!(
"test_rust_{}_{}",
Expand All @@ -45,7 +45,7 @@ pub(crate) fn unique_keyspace_name() -> String {
name
}

pub(crate) async fn test_with_3_node_cluster<F, Fut>(
pub(super) async fn test_with_3_node_cluster<F, Fut>(
shard_awareness: ShardAwareness,
test: F,
) -> Result<(), ProxyError>
Expand Down Expand Up @@ -95,7 +95,7 @@ where
running_proxy.finish().await
}

pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool {
pub(super) async fn supports_feature(session: &Session, feature: &str) -> bool {
// Cassandra doesn't have a concept of features, so first detect
// if there is the `supported_features` column in system.local

Expand Down Expand Up @@ -127,14 +127,15 @@ pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool {
.any(|f| f == feature)
}

pub(crate) async fn scylla_supports_tablets(session: &Session) -> bool {
pub(super) async fn scylla_supports_tablets(session: &Session) -> bool {
supports_feature(session, "TABLETS").await
}


// Creates a generic session builder based on conditional compilation configuration
// For SessionBuilder of DefaultMode type, adds localhost to known hosts, as all of the tests
// connect to localhost.
pub(crate) fn create_new_session_builder() -> GenericSessionBuilder<impl SessionBuilderKind> {
pub(super) fn create_new_session_builder() -> GenericSessionBuilder<impl SessionBuilderKind> {
let session_builder = {
#[cfg(not(scylla_cloud_tests))]
{
Expand Down Expand Up @@ -168,7 +169,7 @@ pub(crate) fn create_new_session_builder() -> GenericSessionBuilder<impl Session

// Shorthands for better readability.
// Copied from Scylla because we don't want to make it public there.
pub(crate) trait DeserializeOwnedValue:
pub(super) trait DeserializeOwnedValue:
for<'frame, 'metadata> DeserializeValue<'frame, 'metadata>
{
}
Expand All @@ -182,7 +183,7 @@ impl<T> DeserializeOwnedValue for T where
// This is to make sure that all DDL queries land on the same node,
// to prevent errors from concurrent DDL queries executed on different nodes.
#[derive(Debug)]
struct SchemaQueriesLBP;
pub(super) struct SchemaQueriesLBP;

impl LoadBalancingPolicy for SchemaQueriesLBP {
fn pick<'a>(
Expand Down Expand Up @@ -210,7 +211,7 @@ impl LoadBalancingPolicy for SchemaQueriesLBP {
}

#[derive(Debug, Default)]
struct SchemaQueriesRetrySession {
pub(super) struct SchemaQueriesRetrySession {
count: usize,
}

Expand Down Expand Up @@ -246,7 +247,7 @@ struct SchemaQueriesRetryPolicy;

impl RetryPolicy for SchemaQueriesRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(SchemaQueriesRetrySession::default())
Box::<SchemaQueriesRetrySession>::default()
}
}

Expand All @@ -265,7 +266,7 @@ fn apply_ddl_lbp(query: &mut Query) {
// we'll be able to do session.ddl(...) instead of perform_ddl(&session, ...)
// or something like that.
#[async_trait::async_trait]
pub(crate) trait PerformDDL {
pub trait PerformDDL {
async fn ddl(&self, query: impl Into<Query> + Send) -> Result<(), ExecutionError>;
}

Expand Down
4 changes: 2 additions & 2 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ mod history;
mod hygiene;
mod large_batch_statements;
mod lwt_optimisation;
mod new_session;
mod retries;
mod self_identity;
mod shards;
mod silent_prepare_batch;
mod silent_prepare_query;
mod skip_metadata_optimization;
mod tablets;
pub(crate) mod utils;
#[path = "../common/utils.rs"]
mod utils;
38 changes: 0 additions & 38 deletions scylla/tests/integration/new_session.rs

This file was deleted.

0 comments on commit 1764bfe

Please sign in to comment.