Skip to content

Commit

Permalink
Merge #1552
Browse files Browse the repository at this point in the history
1552: Various fixes for idempotency, error messages and concurrency r=tiagolobocastro a=tiagolobocastro

    fix(grpc/snapshot): snapshot grpc should use replica svc
    
    We should not allow concurrent snapshot and replica operations as they might clash.
    todo: allow per-resource locking
    Furthermore it was observed that holding a bdev ptr whilst it's being deleted might
    cause a crash, we need to determine if this was to do with it being held over an
    async point, or a more general problem.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    ci(test/wipe/replicas): allow list and wipe replicas
    
    System test is using wipe replicas, which can block list calls from the control-plane
    and thus confusing it thinking the node is not responding.
    Let's allow concurrent wipe with lists, since wiping doesn't destroy or modify
    replica metadata anyway.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(io-engine/client): fix cli args
    
    Some args were not setup correctly since clap update.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix: bad error message
    
    ChildNotFound error had an invalid error message.
    Don't output NotFound messages for ptpl.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(lvs/status): fixup missing bdev errors
    
    Allows the control-plane to determine what the inner error was.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Nov 29, 2023
2 parents 8ac0116 + a7cf92a commit 282f40c
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 96 deletions.
6 changes: 1 addition & 5 deletions io-engine/src/bdev/nexus/nexus_bdev_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ pub enum Error {
name
))]
RemoveLastHealthyChild { child: String, name: String },
#[snafu(display(
"Cannot remove or offline the last healthy child {} of nexus {}",
child,
name
))]
#[snafu(display("Child {} of nexus {} not found", child, name))]
ChildNotFound { child: String, name: String },
#[snafu(display("Child {} of nexus {} is not open", child, name))]
ChildDeviceNotOpen { child: String, name: String },
Expand Down
4 changes: 3 additions & 1 deletion io-engine/src/bdev/nexus/nexus_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ impl<'n> From<&Nexus<'n>> for NexusPtpl {
impl PtplFileOps for NexusPtpl {
fn destroy(&self) -> Result<(), std::io::Error> {
if let Some(path) = self.path() {
std::fs::remove_file(path)?;
if path.exists() {
std::fs::remove_file(path)?;
}
}
Ok(())
}
Expand Down
5 changes: 1 addition & 4 deletions io-engine/src/bdev_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ pub enum BdevError {
#[snafu(display("BDEV '{}' could not be found", name))]
BdevNotFound { name: String },
// Invalid creation parameters.
#[snafu(display(
"Failed to create a BDEV '{}': invalid parameters",
name
))]
#[snafu(display("Failed to create a BDEV '{}'", name))]
CreateBdevInvalidParams { source: Errno, name: String },
// Generic creation failure.
#[snafu(display("Failed to create a BDEV '{}'", name))]
Expand Down
23 changes: 12 additions & 11 deletions io-engine/src/bin/io-engine-client/v0/nexus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,17 +708,18 @@ async fn nexus_publish(
.get_one::<String>("key")
.cloned()
.unwrap_or_default();
let protocol = match matches.get_one::<&str>("protocol") {
None => v0::ShareProtocolNexus::NexusNbd,
Some(&"nvmf") => v0::ShareProtocolNexus::NexusNvmf,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let protocol =
match matches.get_one::<String>("protocol").map(|s| s.as_str()) {
None => v0::ShareProtocolNexus::NexusNbd,
Some("nvmf") => v0::ShareProtocolNexus::NexusNvmf,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let allowed_hosts = matches
.get_many::<String>("allowed-host")
.unwrap_or_default()
Expand Down
23 changes: 12 additions & 11 deletions io-engine/src/bin/io-engine-client/v1/nexus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,18 @@ async fn nexus_publish(
.cloned()
.unwrap_or_default();

let protocol = match matches.get_one::<&str>("protocol") {
None => v1::common::ShareProtocol::Nvmf as i32,
Some(&"nvmf") => v1::common::ShareProtocol::Nvmf as i32,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let protocol =
match matches.get_one::<String>("protocol").map(|s| s.as_str()) {
None => v1::common::ShareProtocol::Nvmf as i32,
Some("nvmf") => v1::common::ShareProtocol::Nvmf as i32,
Some(_) => {
return Err(Status::new(
Code::Internal,
"Invalid value of share protocol".to_owned(),
))
.context(GrpcStatus);
}
};
let allowed_hosts = matches
.get_many::<String>("allowed-host")
.unwrap_or_default()
Expand Down
12 changes: 8 additions & 4 deletions io-engine/src/bin/io-engine-client/v1/test_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,14 @@ async fn replica_wipe(
.map_err(|e| Status::invalid_argument(e.to_string()))
.context(GrpcStatus)?;

let chunk_size =
parse_size(matches.get_one::<&str>("chunk-size").unwrap_or(&"0"))
.map_err(|s| Status::invalid_argument(format!("Bad size '{s}'")))
.context(GrpcStatus)?;
let chunk_size = parse_size(
matches
.get_one::<String>("chunk-size")
.map(|s| s.as_str())
.unwrap_or("0"),
)
.map_err(|s| Status::invalid_argument(format!("Bad size '{s}'")))
.context(GrpcStatus)?;
let response = ctx
.v1
.test
Expand Down
33 changes: 26 additions & 7 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor},
core::{CoreError, Reactor, VerboseError},
};

impl From<BdevError> for tonic::Status {
Expand All @@ -33,15 +33,28 @@ impl From<BdevError> for tonic::Status {
BdevError::BoolParamParseFailed {
..
} => Status::invalid_argument(e.to_string()),
BdevError::CreateBdevInvalidParams {
BdevError::UuidParamParseFailed {
..
} => Status::invalid_argument(e.to_string()),
BdevError::BdevWrongUuid {
..
} => Status::invalid_argument(e.to_string()),
BdevError::CreateBdevFailed {
source, ..
}
| BdevError::CreateBdevInvalidParams {
source, ..
} => match source {
Errno::EINVAL => Status::invalid_argument(e.to_string()),
Errno::ENOENT => Status::not_found(e.to_string()),
Errno::EEXIST => Status::already_exists(e.to_string()),
_ => Status::invalid_argument(e.to_string()),
Errno::EINVAL => Status::invalid_argument(e.verbose()),
Errno::ENOENT => Status::not_found(e.verbose()),
Errno::ENODEV => Status::not_found(e.verbose()),
Errno::EEXIST => Status::already_exists(e.verbose()),
_ => Status::invalid_argument(e.verbose()),
},
e => Status::internal(e.to_string()),
BdevError::BdevNotFound {
..
} => Status::not_found(e.to_string()),
e => Status::internal(e.verbose()),
}
}
}
Expand Down Expand Up @@ -107,6 +120,12 @@ pub(crate) trait Serializer<F, T> {
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
}

#[async_trait::async_trait]
pub(crate) trait RWSerializer<F, T> {
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status>;
}

pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;

/// call the given future within the context of the reactor on the first core
Expand Down
8 changes: 6 additions & 2 deletions io-engine/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ impl MayastorGrpcServer {
v1::replica::ReplicaRpcServer::new(replica_v1.clone())
}))
.add_optional_service(enable_v1.map(|_| {
v1::test::TestRpcServer::new(TestService::new(replica_v1))
v1::test::TestRpcServer::new(TestService::new(
replica_v1.clone(),
))
}))
.add_optional_service(enable_v1.map(|_| {
v1::snapshot::SnapshotRpcServer::new(SnapshotService::new())
v1::snapshot::SnapshotRpcServer::new(SnapshotService::new(
replica_v1,
))
}))
.add_optional_service(enable_v1.map(|_| {
v1::host::HostRpcServer::new(HostService::new(
Expand Down
33 changes: 27 additions & 6 deletions io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
UntypedBdev,
UpdateProps,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, Serializer},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWSerializer},
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol},
};
use ::function_name::named;
Expand All @@ -26,17 +26,17 @@ pub struct ReplicaService {
#[allow(unused)]
name: String,
client_context:
std::sync::Arc<tokio::sync::Mutex<Option<GrpcClientContext>>>,
std::sync::Arc<tokio::sync::RwLock<Option<GrpcClientContext>>>,
}

#[async_trait::async_trait]
impl<F, T> Serializer<F, T> for ReplicaService
impl<F, T> RWSerializer<F, T> for ReplicaService
where
T: Send + 'static,
F: core::future::Future<Output = Result<T, Status>> + Send + 'static,
{
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let mut context_guard = self.client_context.lock().await;
let mut context_guard = self.client_context.write().await;

// Store context as a marker of to detect abnormal termination of the
// request. Even though AssertUnwindSafe() allows us to
Expand All @@ -57,6 +57,27 @@ where
// Request completed, remove the marker.
let ctx = context_guard.take().expect("gRPC context disappeared");

match r {
Ok(r) => r,
Err(_e) => {
warn!("{}: gRPC method panicked, args: {}", ctx.id, ctx.args);
Err(Status::cancelled(format!(
"{}: gRPC method panicked",
ctx.id
)))
}
}
}
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let context_guard = self.client_context.read().await;

if let Some(c) = context_guard.as_ref() {
warn!("{}: gRPC method timed out, args: {}", c.id, c.args);
}

let fut = AssertUnwindSafe(f).catch_unwind();
let r = fut.await;

match r {
Ok(r) => r,
Err(_e) => {
Expand Down Expand Up @@ -120,7 +141,7 @@ impl ReplicaService {
pub fn new() -> Self {
Self {
name: String::from("ReplicaSvc"),
client_context: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
client_context: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
}
}
}
Expand Down Expand Up @@ -311,7 +332,7 @@ impl ReplicaRpc for ReplicaService {
&self,
request: Request<ListReplicaOptions>,
) -> GrpcResult<ListReplicasResponse> {
self.locked(GrpcClientContext::new(&request, function_name!()), async {
self.shared(GrpcClientContext::new(&request, function_name!()), async {
let args = request.into_inner();
trace!("{:?}", args);
let rx = rpc_submit::<_, _, LvsError>(async move {
Expand Down
53 changes: 11 additions & 42 deletions io-engine/src/grpc/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
v1::nexus::nexus_lookup,
GrpcClientContext,
GrpcResult,
Serializer,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, Lvs, LvsLvol},
spdk_rs::ffihelper::IntoCString,
Expand All @@ -45,7 +45,7 @@ const SNAPSHOT_READY_AS_SOURCE: bool = false;
#[allow(dead_code)]
pub struct SnapshotService {
name: String,
client_context: tokio::sync::Mutex<Option<GrpcClientContext>>,
replica_svc: super::replica::ReplicaService,
}

#[derive(Debug)]
Expand Down Expand Up @@ -201,55 +201,24 @@ impl From<VolumeSnapshotDescriptor> for SnapshotInfo {
}
}
#[async_trait::async_trait]
impl<F, T> Serializer<F, T> for SnapshotService
impl<F, T> RWSerializer<F, T> for SnapshotService
where
T: Send + 'static,
F: core::future::Future<Output = Result<T, Status>> + Send + 'static,
{
async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
let mut context_guard = self.client_context.lock().await;

// Store context as a marker of to detect abnormal termination of the
// request. Even though AssertUnwindSafe() allows us to
// intercept asserts in underlying method strategies, such a
// situation can still happen when the high-level future that
// represents gRPC call at the highest level (i.e. the one created
// by gRPC server) gets cancelled (due to timeout or somehow else).
// This can't be properly intercepted by 'locked' function itself in the
// first place, so the state needs to be cleaned up properly
// upon subsequent gRPC calls.
if let Some(c) = context_guard.replace(ctx) {
warn!("{}: gRPC method timed out, args: {}", c.id, c.args);
}

let fut = AssertUnwindSafe(f).catch_unwind();
let r = fut.await;

// Request completed, remove the marker.
let ctx = context_guard.take().expect("gRPC context disappeared");

match r {
Ok(r) => r,
Err(_e) => {
warn!("{}: gRPC method panicked, args: {}", ctx.id, ctx.args);
Err(Status::cancelled(format!(
"{}: gRPC method panicked",
ctx.id
)))
}
}
self.replica_svc.locked(ctx, f).await
}
}
impl Default for SnapshotService {
fn default() -> Self {
Self::new()
async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result<T, Status> {
self.replica_svc.shared(ctx, f).await
}
}

impl SnapshotService {
pub fn new() -> Self {
pub fn new(replica_svc: super::replica::ReplicaService) -> Self {
Self {
name: String::from("SnapshotSvc"),
client_context: tokio::sync::Mutex::new(None),
replica_svc,
}
}
async fn serialized<T, F>(
Expand Down Expand Up @@ -490,7 +459,7 @@ impl SnapshotRpc for SnapshotService {
&self,
request: Request<ListSnapshotsRequest>,
) -> GrpcResult<ListSnapshotsResponse> {
self.locked(
self.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down Expand Up @@ -739,7 +708,7 @@ impl SnapshotRpc for SnapshotService {
&self,
request: Request<ListSnapshotCloneRequest>,
) -> GrpcResult<ListSnapshotCloneResponse> {
self.locked(
self.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Bdev,
VerboseError,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, Serializer},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWSerializer},
lvs::{Error as LvsError, Lvol, Lvs, LvsLvol},
};
use ::function_name::named;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl TestRpc for TestService {

crate::core::spawn(async move {
let result = replica_svc
.locked(
.shared(
GrpcClientContext::new(&request, function_name!()),
async move {
let args = request.into_inner();
Expand Down
4 changes: 3 additions & 1 deletion io-engine/src/lvs/lvs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,9 @@ impl From<&Lvs> for LvsPtpl {
impl PtplFileOps for LvsPtpl {
fn destroy(&self) -> Result<(), std::io::Error> {
if let Some(path) = self.path() {
std::fs::remove_dir_all(path)?;
if path.exists() {
std::fs::remove_dir_all(path)?;
}
}
Ok(())
}
Expand Down

0 comments on commit 282f40c

Please sign in to comment.