Skip to content

Commit

Permalink
Merge pull request #1204 from muzarski/inline-request-error
Browse files Browse the repository at this point in the history
errors: final cleanups (restructure ExecutionError and introduce SchemaAgreementError)
  • Loading branch information
wprzytula authored Feb 10, 2025
2 parents 43f758f + 34afac7 commit 86efc40
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 262 deletions.
6 changes: 4 additions & 2 deletions examples/schema_agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, Result};
use futures::TryStreamExt as _;
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::errors::ExecutionError;
use scylla::errors::SchemaAgreementError;
use std::env;
use std::time::Duration;

Expand All @@ -27,7 +27,9 @@ async fn main() -> Result<()> {

match session.await_schema_agreement().await {
Ok(_schema_version) => println!("Schema is in agreement in time"),
Err(ExecutionError::RequestTimeout(_)) => println!("Schema is NOT in agreement in time"),
Err(SchemaAgreementError::Timeout(_)) => {
println!("Schema is NOT in agreement in time")
}
Err(err) => bail!(err),
};
session
Expand Down
42 changes: 42 additions & 0 deletions scylla-cql/src/frame/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Error {
/// An error sent from the database in response to a query
/// as described in the [specification](https://github.com/apache/cassandra/blob/5ed5e84613ef0e9664a774493db7d2604e3596e0/doc/native_protocol_v4.spec#L1029)\
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum DbError {
/// The submitted query has a syntax error
#[error("The submitted query has a syntax error")]
Expand Down Expand Up @@ -386,6 +387,47 @@ impl DbError {
} => protocol_features.rate_limit_error.unwrap(),
}
}

/// Decides whether the error can be ignored. If true, the driver can perform
/// a speculative retry to the next target.
pub fn can_speculative_retry(&self) -> bool {
// Do not remove this lint!
// It's there for a reason - we don't want new variants
// automatically fall under `_` pattern when they are introduced.
#[deny(clippy::wildcard_enum_match_arm)]
match self {
// Errors that will almost certainly appear on other nodes as well
DbError::SyntaxError
| DbError::Invalid
| DbError::AlreadyExists { .. }
| DbError::Unauthorized
| DbError::ProtocolError => false,

// Errors that should not appear there - thus, should not be ignored.
DbError::AuthenticationError | DbError::Other(_) => false,

// For now, let's assume that UDF failure is not transient - don't ignore it
// TODO: investigate
DbError::FunctionFailure { .. } => false,

// Not sure when these can appear - don't ignore them
// TODO: Investigate these errors
DbError::ConfigError | DbError::TruncateError => false,

// Errors that we can ignore and perform a retry on some other node
DbError::Unavailable { .. }
| DbError::Overloaded
| DbError::IsBootstrapping
| DbError::ReadTimeout { .. }
| DbError::WriteTimeout { .. }
| DbError::ReadFailure { .. }
| DbError::WriteFailure { .. }
// Preparation may succeed on some other node.
| DbError::Unprepared { .. }
| DbError::ServerError
| DbError::RateLimitReached { .. } => true,
}
}
}

/// Type of the operation rejected by rate limiting
Expand Down
26 changes: 13 additions & 13 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
ProtocolError, RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand Down Expand Up @@ -867,7 +867,9 @@ impl Session {
.await?;
if !paging_state_response.finished() {
error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
return Err(ProtocolError::NonfinishedPagingState.into());
return Err(ExecutionError::LastAttemptError(
RequestAttemptError::NonfinishedPagingState,
));
}
Ok(result)
}
Expand Down Expand Up @@ -988,9 +990,7 @@ impl Session {
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_execution_error)?;
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1175,7 +1175,9 @@ impl Session {
.await?;
if !paging_state.finished() {
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
return Err(ProtocolError::NonfinishedPagingState.into());
return Err(ExecutionError::LastAttemptError(
RequestAttemptError::NonfinishedPagingState,
));
}
Ok(result)
}
Expand Down Expand Up @@ -1295,9 +1297,7 @@ impl Session {
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_execution_error)?;
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1938,7 +1938,7 @@ impl Session {
last_error.map(Result::Err)
}

async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, ExecutionError> {
async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
loop {
tokio::time::sleep(self.schema_agreement_interval).await;
if let Some(agreed_version) = self.check_schema_agreement().await? {
Expand All @@ -1947,18 +1947,18 @@ impl Session {
}
}

pub async fn await_schema_agreement(&self) -> Result<Uuid, ExecutionError> {
pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
timeout(
self.schema_agreement_timeout,
self.await_schema_agreement_indefinitely(),
)
.await
.unwrap_or(Err(ExecutionError::SchemaAgreementTimeout(
.unwrap_or(Err(SchemaAgreementError::Timeout(
self.schema_agreement_timeout,
)))
}

pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, ExecutionError> {
pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
let cluster_state = self.get_cluster_state();
let connections_iter = cluster_state.iter_working_connections()?;

Expand Down
17 changes: 12 additions & 5 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::cluster::metadata::{
CollectionType, ColumnKind, ColumnType, NativeType, UserDefinedType,
};
use crate::deserialize::DeserializeOwnedValue;
use crate::errors::{BadKeyspaceName, DbError, ExecutionError, UseKeyspaceError};
use crate::errors::{
BadKeyspaceName, DbError, ExecutionError, RequestAttemptError, UseKeyspaceError,
};
use crate::observability::tracing::TracingInfo;
use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::prepared_statement::PreparedStatement;
Expand Down Expand Up @@ -948,15 +950,17 @@ async fn test_db_errors() {
// SyntaxError on bad query
assert!(matches!(
session.query_unpaged("gibberish", &[]).await,
Err(ExecutionError::DbError(DbError::SyntaxError, _))
Err(ExecutionError::LastAttemptError(
RequestAttemptError::DbError(DbError::SyntaxError, _)
))
));

// AlreadyExists when creating a keyspace for the second time
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();

let create_keyspace_res = session.ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await;
let keyspace_exists_error: DbError = match create_keyspace_res {
Err(ExecutionError::DbError(e, _)) => e,
Err(ExecutionError::LastAttemptError(RequestAttemptError::DbError(e, _))) => e,
_ => panic!("Second CREATE KEYSPACE didn't return an error!"),
};

Expand All @@ -981,7 +985,7 @@ async fn test_db_errors() {
.ddl(format!("CREATE TABLE {}.tab (a text primary key)", ks))
.await;
let create_tab_error: DbError = match create_table_res {
Err(ExecutionError::DbError(e, _)) => e,
Err(ExecutionError::LastAttemptError(RequestAttemptError::DbError(e, _))) => e,
_ => panic!("Second CREATE TABLE didn't return an error!"),
};

Expand Down Expand Up @@ -2621,7 +2625,10 @@ async fn test_rate_limit_exceeded_exception() {
use crate::errors::OperationType;

match maybe_err.expect("Rate limit error didn't occur") {
ExecutionError::DbError(DbError::RateLimitReached { op_type, .. }, _) => {
ExecutionError::LastAttemptError(RequestAttemptError::DbError(
DbError::RateLimitReached { op_type, .. },
_,
)) => {
assert_eq!(op_type, OperationType::Write);
}
err => panic!("Unexpected error type received: {:?}", err),
Expand Down
12 changes: 4 additions & 8 deletions scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::{BadQuery, ConnectionPoolError};
use crate::errors::ConnectionPoolError;
use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
use crate::policies::host_filter::HostFilter;
use crate::prepared_statement::TokenCalculationError;
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ClusterState {
keyspace: &str,
table: &str,
partition_key: &SerializedValues,
) -> Result<Token, BadQuery> {
) -> Result<Token, TokenCalculationError> {
let partitioner = self
.keyspaces
.get(keyspace)
Expand All @@ -213,11 +213,7 @@ impl ClusterState {
.and_then(PartitionerName::from_str)
.unwrap_or_default();

calculate_token_for_partition_key(partition_key, &partitioner).map_err(|err| match err {
TokenCalculationError::ValueTooLong(values_len) => {
BadQuery::ValuesTooLongForKey(values_len, u16::MAX.into())
}
})
calculate_token_for_partition_key(partition_key, &partitioner)
}

/// Access to replicas owning a given token
Expand Down Expand Up @@ -255,7 +251,7 @@ impl ClusterState {
keyspace: &str,
table: &str,
partition_key: &SerializedValues,
) -> Result<Vec<(Arc<Node>, Shard)>, BadQuery> {
) -> Result<Vec<(Arc<Node>, Shard)>, TokenCalculationError> {
let token = self.compute_token(keyspace, table, partition_key)?;
Ok(self.get_token_endpoints(keyspace, table, token))
}
Expand Down
Loading

0 comments on commit 86efc40

Please sign in to comment.