From 1764bfef73ff863fd35dabd30b73c54b1af5865a Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Mon, 10 Feb 2025 12:24:36 +0100 Subject: [PATCH] wip Signed-off-by: Dusan Malusev --- scylla/tests/ccm_integration/main.rs | 6 + scylla/tests/ccm_integration/new_session.rs | 165 ++++++++++++++++++ scylla/tests/common/retry_policy.rs | 23 +++ scylla/tests/{integration => common}/utils.rs | 23 +-- scylla/tests/integration/main.rs | 4 +- scylla/tests/integration/new_session.rs | 38 ---- 6 files changed, 208 insertions(+), 51 deletions(-) create mode 100644 scylla/tests/ccm_integration/main.rs create mode 100644 scylla/tests/ccm_integration/new_session.rs create mode 100644 scylla/tests/common/retry_policy.rs rename scylla/tests/{integration => common}/utils.rs (94%) delete mode 100644 scylla/tests/integration/new_session.rs diff --git a/scylla/tests/ccm_integration/main.rs b/scylla/tests/ccm_integration/main.rs new file mode 100644 index 0000000000..84b1aab732 --- /dev/null +++ b/scylla/tests/ccm_integration/main.rs @@ -0,0 +1,6 @@ +mod new_session; +#[path = "../common/retry_policy.rs"] +mod retry_policy; + +#[path = "../common/utils.rs"] +mod utils; \ No newline at end of file diff --git a/scylla/tests/ccm_integration/new_session.rs b/scylla/tests/ccm_integration/new_session.rs new file mode 100644 index 0000000000..ee2a862c92 --- /dev/null +++ b/scylla/tests/ccm_integration/new_session.rs @@ -0,0 +1,165 @@ +use std::sync::Arc; +use assert_matches::assert_matches; +use scylla::client::execution_profile::ExecutionProfile; +use scylla::client::session_builder::SessionBuilder; +use scylla::errors::{ConnectionError, ConnectionPoolError, ExecutionError, NewSessionError}; +use scylla::policies::retry::DefaultRetryPolicy; + +use crate::retry_policy::NoRetryPolicy; +use crate::utils::setup_tracing; + +fn get_scylla() -> (String, String, String) { + let uri1 = std::env::var("SCYLLA_URI1").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); + let uri2 = std::env::var("SCYLLA_URI2").unwrap_or_else(|_| "127.0.0.2:9042".to_string()); + let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string()); + + (uri1, uri2, uri3) +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn proceed_if_only_some_hostnames_are_invalid() { + setup_tracing(); + // on purpose left without port + let uri1 = "scylladbisthefastestdb.invalid".to_owned(); + // correctly provided port, but unknown domain + let uri2 = "cassandrasuckssomuch.invalid:9042".to_owned(); + let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string()); + + let session = SessionBuilder::new() + .known_nodes([uri1, uri2, uri3]) + .build() + .await + .unwrap(); + + assert!(session + .query_unpaged("SELECT host_id FROM system.local", &[]) + .await + .is_ok()); +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn all_hostnames_invalid() { + setup_tracing(); + let uri = "cassandrasuckssomuch.invalid:9042".to_owned(); + + assert_matches!( + SessionBuilder::new().known_node(uri).build().await, + Err(NewSessionError::FailedToResolveAnyHostname(_)) + ); +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn no_nodes_available_reconnection_enabled() { + setup_tracing(); + + let execution_profile = ExecutionProfile::builder() + .retry_policy(Arc::new(DefaultRetryPolicy::default())) + .build(); + + assert!(SessionBuilder::new() + .default_execution_profile_handle(execution_profile.into_handle()) + .build() + .await + .is_ok()); +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn no_nodes_available_reconnection_disabled() { + setup_tracing(); + // TODO: Replace with CCM + let (uri1, uri2, uri3) = get_scylla(); + + let execution_profile = ExecutionProfile::builder() + .retry_policy(Arc::new(NoRetryPolicy)) + .build(); + + assert_matches!( + SessionBuilder::new() + .known_nodes([uri1, uri2, uri3]) + .default_execution_profile_handle(execution_profile.into_handle()) + .build() + .await, + Err(NewSessionError::FailedToResolveAnyHostname(_)) + ); +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn no_nodes_available_reconnection_enabled_nodes_coming_back() { + setup_tracing(); + // TODO: Setup CCM + + let execution_profile = ExecutionProfile::builder() + .retry_policy(Arc::new(DefaultRetryPolicy::default())) + .build(); + + assert!(SessionBuilder::new() + .default_execution_profile_handle(execution_profile.into_handle()) + .build() + .await + .is_ok()); +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn session_created_nodes_away_reconnection_enabled() { + setup_tracing(); + + let execution_profile = ExecutionProfile::builder() + .retry_policy(Arc::new(DefaultRetryPolicy::default())) + .build(); + + let _session = SessionBuilder::new() + .default_execution_profile_handle(execution_profile.into_handle()) + .build() + .await + .unwrap(); + + assert!(true); +} + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +async fn session_created_nodes_away_reconnection_disabled() { + setup_tracing(); + + // TODO: Replace with CCM + let (uri1, uri2, uri3) = get_scylla(); + + let execution_profile = ExecutionProfile::builder() + .retry_policy(Arc::new(NoRetryPolicy)) + .build(); + + let session = SessionBuilder::new() + .known_nodes([uri1, uri2, uri3]) + .default_execution_profile_handle(execution_profile.into_handle()) + .build() + .await + .unwrap(); + + // TODO: Everything should be fine + assert!(session + .query_unpaged("SELECT host_id FROM system.local", &[]) + .await + .is_ok()); + + // TODO: Stop the nodes + + // TODO: Check the connection -> fails to execute query + assert_matches!( + session + .query_unpaged("SELECT host_id FROM system.local", &[]) + .await, + Err(ExecutionError::ConnectionPoolError( + ConnectionPoolError::Broken { + last_connection_error: ConnectionError::BrokenConnection(_), + } + )) + ); + + assert!(true); +} diff --git a/scylla/tests/common/retry_policy.rs b/scylla/tests/common/retry_policy.rs new file mode 100644 index 0000000000..2e10eff1b2 --- /dev/null +++ b/scylla/tests/common/retry_policy.rs @@ -0,0 +1,23 @@ +use scylla::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; +use std::fmt::Debug; + +#[derive(Debug, Clone)] +pub(crate) struct NoRetryPolicy; + +#[derive(Debug, Clone)] +pub(crate) struct NoRetrySession; + + +impl RetryPolicy for NoRetryPolicy { + fn new_session(&self) -> Box { + Box::new(NoRetrySession) + } +} + +impl RetrySession for NoRetrySession { + fn decide_should_retry(&mut self, _request_info: RequestInfo) -> RetryDecision { + RetryDecision::DontRetry + } + + fn reset(&mut self) {} +} diff --git a/scylla/tests/integration/utils.rs b/scylla/tests/common/utils.rs similarity index 94% rename from scylla/tests/integration/utils.rs rename to scylla/tests/common/utils.rs index d67f887d4f..3cc26c6b22 100644 --- a/scylla/tests/integration/utils.rs +++ b/scylla/tests/common/utils.rs @@ -22,7 +22,7 @@ use tracing::{error, warn}; use scylla_proxy::{Node, Proxy, ProxyError, RunningProxy, ShardAwareness}; -pub(crate) fn setup_tracing() { +pub fn setup_tracing() { let _ = tracing_subscriber::fmt::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_writer(tracing_subscriber::fmt::TestWriter::new()) @@ -31,7 +31,7 @@ pub(crate) fn setup_tracing() { static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0); -pub(crate) fn unique_keyspace_name() -> String { +pub(super) fn unique_keyspace_name() -> String { let cnt = UNIQUE_COUNTER.fetch_add(1, Ordering::SeqCst); let name = format!( "test_rust_{}_{}", @@ -45,7 +45,7 @@ pub(crate) fn unique_keyspace_name() -> String { name } -pub(crate) async fn test_with_3_node_cluster( +pub(super) async fn test_with_3_node_cluster( shard_awareness: ShardAwareness, test: F, ) -> Result<(), ProxyError> @@ -95,7 +95,7 @@ where running_proxy.finish().await } -pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool { +pub(super) async fn supports_feature(session: &Session, feature: &str) -> bool { // Cassandra doesn't have a concept of features, so first detect // if there is the `supported_features` column in system.local @@ -127,14 +127,15 @@ pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool { .any(|f| f == feature) } -pub(crate) async fn scylla_supports_tablets(session: &Session) -> bool { +pub(super) async fn scylla_supports_tablets(session: &Session) -> bool { supports_feature(session, "TABLETS").await } + // Creates a generic session builder based on conditional compilation configuration // For SessionBuilder of DefaultMode type, adds localhost to known hosts, as all of the tests // connect to localhost. -pub(crate) fn create_new_session_builder() -> GenericSessionBuilder { +pub(super) fn create_new_session_builder() -> GenericSessionBuilder { let session_builder = { #[cfg(not(scylla_cloud_tests))] { @@ -168,7 +169,7 @@ pub(crate) fn create_new_session_builder() -> GenericSessionBuilder DeserializeValue<'frame, 'metadata> { } @@ -182,7 +183,7 @@ impl DeserializeOwnedValue for T where // This is to make sure that all DDL queries land on the same node, // to prevent errors from concurrent DDL queries executed on different nodes. #[derive(Debug)] -struct SchemaQueriesLBP; +pub(super) struct SchemaQueriesLBP; impl LoadBalancingPolicy for SchemaQueriesLBP { fn pick<'a>( @@ -210,7 +211,7 @@ impl LoadBalancingPolicy for SchemaQueriesLBP { } #[derive(Debug, Default)] -struct SchemaQueriesRetrySession { +pub(super) struct SchemaQueriesRetrySession { count: usize, } @@ -246,7 +247,7 @@ struct SchemaQueriesRetryPolicy; impl RetryPolicy for SchemaQueriesRetryPolicy { fn new_session(&self) -> Box { - Box::new(SchemaQueriesRetrySession::default()) + Box::::default() } } @@ -265,7 +266,7 @@ fn apply_ddl_lbp(query: &mut Query) { // we'll be able to do session.ddl(...) instead of perform_ddl(&session, ...) // or something like that. #[async_trait::async_trait] -pub(crate) trait PerformDDL { +pub trait PerformDDL { async fn ddl(&self, query: impl Into + Send) -> Result<(), ExecutionError>; } diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index 86b529dc08..80fc1a8909 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -10,7 +10,6 @@ mod history; mod hygiene; mod large_batch_statements; mod lwt_optimisation; -mod new_session; mod retries; mod self_identity; mod shards; @@ -18,4 +17,5 @@ mod silent_prepare_batch; mod silent_prepare_query; mod skip_metadata_optimization; mod tablets; -pub(crate) mod utils; +#[path = "../common/utils.rs"] +mod utils; diff --git a/scylla/tests/integration/new_session.rs b/scylla/tests/integration/new_session.rs deleted file mode 100644 index bbb89bc5bc..0000000000 --- a/scylla/tests/integration/new_session.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::utils::setup_tracing; - -use assert_matches::assert_matches; -use scylla::client::session_builder::SessionBuilder; -use scylla::errors::NewSessionError; - -#[cfg(not(scylla_cloud_tests))] -#[tokio::test] -async fn proceed_if_only_some_hostnames_are_invalid() { - setup_tracing(); - // on purpose left without port - let uri1 = "scylladbisthefastestdb.invalid".to_owned(); - // correctly provided port, but unknown domain - let uri2 = "cassandrasuckssomuch.invalid:9042".to_owned(); - let uri3 = std::env::var("SCYLLA_URI3").unwrap_or_else(|_| "127.0.0.3:9042".to_string()); - - let session = SessionBuilder::new() - .known_nodes([uri1, uri2, uri3]) - .build() - .await - .unwrap(); - session - .query_unpaged("SELECT host_id FROM system.local", &[]) - .await - .unwrap(); -} - -#[cfg(not(scylla_cloud_tests))] -#[tokio::test] -async fn all_hostnames_invalid() { - setup_tracing(); - let uri = "cassandrasuckssomuch.invalid:9042".to_owned(); - - assert_matches!( - SessionBuilder::new().known_node(uri).build().await, - Err(NewSessionError::FailedToResolveAnyHostname(_)) - ); -}