Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes for idempotency, error messages and concurrency #1552

Merged
merged 6 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions io-engine/src/bdev/nexus/nexus_bdev_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ pub enum Error {
name
))]
RemoveLastHealthyChild { child: String, name: String },
#[snafu(display(
"Cannot remove or offline the last healthy child {} of nexus {}",
child,
name
))]
#[snafu(display("Child {} of nexus {} not found", child, name))]
ChildNotFound { child: String, name: String },
#[snafu(display("Child {} of nexus {} is not open", child, name))]
ChildDeviceNotOpen { child: String, name: String },
Expand Down
4 changes: 3 additions & 1 deletion io-engine/src/bdev/nexus/nexus_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ impl<'n> From<&Nexus<'n>> for NexusPtpl {
impl PtplFileOps for NexusPtpl {
fn destroy(&self) -> Result<(), std::io::Error> {
if let Some(path) = self.path() {
std::fs::remove_file(path)?;
if path.exists() {
std::fs::remove_file(path)?;
}
}
Ok(())
}
Expand Down
5 changes: 1 addition & 4 deletions io-engine/src/bdev_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ pub enum BdevError {
#[snafu(display("BDEV '{}' could not be found", name))]
BdevNotFound { name: String },
// Invalid creation parameters.
#[snafu(display(
"Failed to create a BDEV '{}': invalid parameters",
name
))]
#[snafu(display("Failed to create a BDEV '{}'", name))]
CreateBdevInvalidParams { source: Errno, name: String },
// Generic creation failure.
#[snafu(display("Failed to create a BDEV '{}'", name))]
Expand Down
23 changes: 12 additions & 11 deletions io-engine/src/bin/io-engine-client/v0/nexus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,17 +708,18 @@ async fn nexus_publish(
.get_one::<String>("key")
.cloned()
.unwrap_or_default();
let protocol = match matches.get_one::<&str>("protocol") {
None => v0::ShareProtocolNexus::NexusNbd,
Some(&"nvmf") => v0::ShareProtocolNexus::NexusNvmf,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let protocol =
match matches.get_one::<String>("protocol").map(|s| s.as_str()) {
None => v0::ShareProtocolNexus::NexusNbd,
Some("nvmf") => v0::ShareProtocolNexus::NexusNvmf,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let allowed_hosts = matches
.get_many::<String>("allowed-host")
.unwrap_or_default()
Expand Down
23 changes: 12 additions & 11 deletions io-engine/src/bin/io-engine-client/v1/nexus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,18 @@ async fn nexus_publish(
.cloned()
.unwrap_or_default();

let protocol = match matches.get_one::<&str>("protocol") {
None => v1::common::ShareProtocol::Nvmf as i32,
Some(&"nvmf") => v1::common::ShareProtocol::Nvmf as i32,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let protocol =
match matches.get_one::<String>("protocol").map(|s| s.as_str()) {
None => v1::common::ShareProtocol::Nvmf as i32,
Some("nvmf") => v1::common::ShareProtocol::Nvmf as i32,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let allowed_hosts = matches
.get_many::<String>("allowed-host")
.unwrap_or_default()
Expand Down
12 changes: 8 additions & 4 deletions io-engine/src/bin/io-engine-client/v1/test_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,14 @@ async fn replica_wipe(
.map_err(|e| Status::invalid_argument(e.to_string()))
.context(GrpcStatus)?;

let chunk_size =
parse_size(matches.get_one::<&str>("chunk-size").unwrap_or(&"0"))
.map_err(|s| Status::invalid_argument(format!("Bad size '{s}'")))
.context(GrpcStatus)?;
let chunk_size = parse_size(
matches
.get_one::<String>("chunk-size")
.map(|s| s.as_str())
.unwrap_or("0"),
)
.map_err(|s| Status::invalid_argument(format!("Bad size '{s}'")))
.context(GrpcStatus)?;
let response = ctx
.v1
.test
Expand Down
33 changes: 26 additions & 7 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor},
core::{CoreError, Reactor, VerboseError},
};

impl From<BdevError> for tonic::Status {
Expand All @@ -33,15 +33,28 @@ impl From<BdevError> for tonic::Status {
BdevError::BoolParamParseFailed {
..
} => Status::invalid_argument(e.to_string()),
BdevError::CreateBdevInvalidParams {
BdevError::UuidParamParseFailed {
..
} => Status::invalid_argument(e.to_string()),
BdevError::BdevWrongUuid {
..
} => Status::invalid_argument(e.to_string()),
BdevError::CreateBdevFailed {
source, ..
}
| BdevError::CreateBdevInvalidParams {
source, ..
} => match source {
Errno::EINVAL => Status::invalid_argument(e.to_string()),
Errno::ENOENT => Status::not_found(e.to_string()),
Errno::EEXIST => Status::already_exists(e.to_string()),
_ => Status::invalid_argument(e.to_string()),
Errno::EINVAL => Status::invalid_argument(e.verbose()),
Errno::ENOENT => Status::not_found(e.verbose()),
Errno::ENODEV => Status::not_found(e.verbose()),
Errno::EEXIST => Status::already_exists(e.verbose()),
_ => Status::invalid_argument(e.verbose()),
},
e => Status::internal(e.to_string()),
BdevError::BdevNotFound {
..
} => Status::not_found(e.to_string()),
e => Status::internal(e.verbose()),
}
}
}
Expand Down Expand Up @@ -107,6 +120,12 @@ pub(crate) trait Serializer<F, T> {
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
}

#[async_trait::async_trait]
pub(crate) trait RWSerializer<F, T> {
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
}

pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;

/// call the given future within the context of the reactor on the first core
Expand Down
8 changes: 6 additions & 2 deletions io-engine/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ impl MayastorGrpcServer {
v1::replica::ReplicaRpcServer::new(replica_v1.clone())
}))
.add_optional_service(enable_v1.map(|_| {
v1::test::TestRpcServer::new(TestService::new(replica_v1))
v1::test::TestRpcServer::new(TestService::new(
replica_v1.clone(),
))
}))
.add_optional_service(enable_v1.map(|_| {
v1::snapshot::SnapshotRpcServer::new(SnapshotService::new())
v1::snapshot::SnapshotRpcServer::new(SnapshotService::new(
replica_v1,
))
}))
.add_optional_service(enable_v1.map(|_| {
v1::host::HostRpcServer::new(HostService::new(
Expand Down
33 changes: 27 additions & 6 deletions io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
UntypedBdev,
UpdateProps,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, Serializer},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWSerializer},
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol},
};
use ::function_name::named;
Expand All @@ -26,17 +26,17 @@ pub struct ReplicaService {
#[allow(unused)]
name: String,
client_context:
std::sync::Arc<tokio::sync::Mutex<Option<GrpcClientContext>>>,
std::sync::Arc<tokio::sync::RwLock<Option<GrpcClientContext>>>,
}

#[async_trait::async_trait]
impl<F, T> Serializer<F, T> for ReplicaService
impl<F, T> RWSerializer<F, T> for ReplicaService
where
T: Send + 'static,
F: core::future::Future<Output = Result<T, Status>> + Send + 'static,
{
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let mut context_guard = self.client_context.lock().await;
let mut context_guard = self.client_context.write().await;

// Store context as a marker of to detect abnormal termination of the
// request. Even though AssertUnwindSafe() allows us to
Expand All @@ -57,6 +57,27 @@ where
// Request completed, remove the marker.
let ctx = context_guard.take().expect("gRPC context disappeared");

match r {
Ok(r) => r,
Err(_e) => {
warn!("{}: gRPC method panicked, args: {}", ctx.id, ctx.args);
Err(Status::cancelled(format!(
"{}: gRPC method panicked",
ctx.id
)))
}
}
}
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let context_guard = self.client_context.read().await;

if let Some(c) = context_guard.as_ref() {
warn!("{}: gRPC method timed out, args: {}", c.id, c.args);
}

let fut = AssertUnwindSafe(f).catch_unwind();
let r = fut.await;

match r {
Ok(r) => r,
Err(_e) => {
Expand Down Expand Up @@ -120,7 +141,7 @@ impl ReplicaService {
pub fn new() -> Self {
Self {
name: String::from("ReplicaSvc"),
client_context: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
client_context: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
}
}
}
Expand Down Expand Up @@ -311,7 +332,7 @@ impl ReplicaRpc for ReplicaService {
&self,
request: Request<ListReplicaOptions>,
) -> GrpcResult<ListReplicasResponse> {
self.locked(GrpcClientContext::new(&request, function_name!()), async {
self.shared(GrpcClientContext::new(&request, function_name!()), async {
let args = request.into_inner();
trace!("{:?}", args);
let rx = rpc_submit::<_, _, LvsError>(async move {
Expand Down
53 changes: 11 additions & 42 deletions io-engine/src/grpc/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
v1::nexus::nexus_lookup,
GrpcClientContext,
GrpcResult,
Serializer,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, Lvs, LvsLvol},
spdk_rs::ffihelper::IntoCString,
Expand All @@ -45,7 +45,7 @@ const SNAPSHOT_READY_AS_SOURCE: bool = false;
#[allow(dead_code)]
pub struct SnapshotService {
name: String,
client_context: tokio::sync::Mutex<Option<GrpcClientContext>>,
replica_svc: super::replica::ReplicaService,
}

#[derive(Debug)]
Expand Down Expand Up @@ -201,55 +201,24 @@ impl From<VolumeSnapshotDescriptor> for SnapshotInfo {
}
}
#[async_trait::async_trait]
impl<F, T> Serializer<F, T> for SnapshotService
impl<F, T> RWSerializer<F, T> for SnapshotService
where
T: Send + 'static,
F: core::future::Future<Output = Result<T, Status>> + Send + 'static,
{
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let mut context_guard = self.client_context.lock().await;

// Store context as a marker of to detect abnormal termination of the
// request. Even though AssertUnwindSafe() allows us to
// intercept asserts in underlying method strategies, such a
// situation can still happen when the high-level future that
// represents gRPC call at the highest level (i.e. the one created
// by gRPC server) gets cancelled (due to timeout or somehow else).
// This can't be properly intercepted by 'locked' function itself in the
// first place, so the state needs to be cleaned up properly
// upon subsequent gRPC calls.
if let Some(c) = context_guard.replace(ctx) {
warn!("{}: gRPC method timed out, args: {}", c.id, c.args);
}

let fut = AssertUnwindSafe(f).catch_unwind();
let r = fut.await;

// Request completed, remove the marker.
let ctx = context_guard.take().expect("gRPC context disappeared");

match r {
Ok(r) => r,
Err(_e) => {
warn!("{}: gRPC method panicked, args: {}", ctx.id, ctx.args);
Err(Status::cancelled(format!(
"{}: gRPC method panicked",
ctx.id
)))
}
}
self.replica_svc.locked(ctx, f).await
}
}
impl Default for SnapshotService {
fn default() -> Self {
Self::new()
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
self.replica_svc.shared(ctx, f).await
}
}

impl SnapshotService {
pub fn new() -> Self {
pub fn new(replica_svc: super::replica::ReplicaService) -> Self {
Self {
name: String::from("SnapshotSvc"),
client_context: tokio::sync::Mutex::new(None),
replica_svc,
}
}
async fn serialized<T, F>(
Expand Down Expand Up @@ -490,7 +459,7 @@ impl SnapshotRpc for SnapshotService {
&self,
request: Request<ListSnapshotsRequest>,
) -> GrpcResult<ListSnapshotsResponse> {
self.locked(
self.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down Expand Up @@ -739,7 +708,7 @@ impl SnapshotRpc for SnapshotService {
&self,
request: Request<ListSnapshotCloneRequest>,
) -> GrpcResult<ListSnapshotCloneResponse> {
self.locked(
self.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Bdev,
VerboseError,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, Serializer},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWSerializer},
lvs::{Error as LvsError, Lvol, Lvs, LvsLvol},
};
use ::function_name::named;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl TestRpc for TestService {

crate::core::spawn(async move {
let result = replica_svc
.locked(
.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down
4 changes: 3 additions & 1 deletion io-engine/src/lvs/lvs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,9 @@ impl From<&Lvs> for LvsPtpl {
impl PtplFileOps for LvsPtpl {
fn destroy(&self) -> Result<(), std::io::Error> {
if let Some(path) = self.path() {
std::fs::remove_dir_all(path)?;
if path.exists() {
std::fs::remove_dir_all(path)?;
}
}
Ok(())
}
Expand Down
Loading