From e672fd57b1631551392ae4321e13ff99018078b2 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 29 Nov 2023 17:50:17 +0000 Subject: [PATCH] fix(grpc/snapshot): snapshot grpc should use replica svc We should not allow concurrent snapshot and replica operations as they might clash. todo: allow per-resource locking Furthermore it was observed that holding a bdev ptr whilst it's being deleted might cause a crash, we need to determine if this was to do with it being held over an async point, or a more general problem. Signed-off-by: Tiago Castro --- io-engine/src/grpc/server.rs | 8 +++-- io-engine/src/grpc/v1/snapshot.rs | 53 +++++++------------------------ test/grpc/test_replica.js | 4 +-- 3 files changed, 19 insertions(+), 46 deletions(-) diff --git a/io-engine/src/grpc/server.rs b/io-engine/src/grpc/server.rs index 3a838e13e..d2d3ad327 100644 --- a/io-engine/src/grpc/server.rs +++ b/io-engine/src/grpc/server.rs @@ -93,10 +93,14 @@ impl MayastorGrpcServer { v1::replica::ReplicaRpcServer::new(replica_v1.clone()) })) .add_optional_service(enable_v1.map(|_| { - v1::test::TestRpcServer::new(TestService::new(replica_v1)) + v1::test::TestRpcServer::new(TestService::new( + replica_v1.clone(), + )) })) .add_optional_service(enable_v1.map(|_| { - v1::snapshot::SnapshotRpcServer::new(SnapshotService::new()) + v1::snapshot::SnapshotRpcServer::new(SnapshotService::new( + replica_v1, + )) })) .add_optional_service(enable_v1.map(|_| { v1::host::HostRpcServer::new(HostService::new( diff --git a/io-engine/src/grpc/v1/snapshot.rs b/io-engine/src/grpc/v1/snapshot.rs index 07745e4d7..87b9ccb68 100644 --- a/io-engine/src/grpc/v1/snapshot.rs +++ b/io-engine/src/grpc/v1/snapshot.rs @@ -21,7 +21,7 @@ use crate::{ v1::nexus::nexus_lookup, GrpcClientContext, GrpcResult, - Serializer, + RWSerializer, }, lvs::{Error as LvsError, Lvol, Lvs, LvsLvol}, spdk_rs::ffihelper::IntoCString, @@ -45,7 +45,7 @@ const SNAPSHOT_READY_AS_SOURCE: bool = false; #[allow(dead_code)] pub struct SnapshotService { name: String, - client_context: tokio::sync::Mutex>, + replica_svc: super::replica::ReplicaService, } #[derive(Debug)] @@ -201,55 +201,24 @@ impl From for SnapshotInfo { } } #[async_trait::async_trait] -impl Serializer for SnapshotService +impl RWSerializer for SnapshotService where T: Send + 'static, F: core::future::Future> + Send + 'static, { async fn locked(&self, ctx: GrpcClientContext, f: F) -> Result { - let mut context_guard = self.client_context.lock().await; - - // Store context as a marker of to detect abnormal termination of the - // request. Even though AssertUnwindSafe() allows us to - // intercept asserts in underlying method strategies, such a - // situation can still happen when the high-level future that - // represents gRPC call at the highest level (i.e. the one created - // by gRPC server) gets cancelled (due to timeout or somehow else). - // This can't be properly intercepted by 'locked' function itself in the - // first place, so the state needs to be cleaned up properly - // upon subsequent gRPC calls. - if let Some(c) = context_guard.replace(ctx) { - warn!("{}: gRPC method timed out, args: {}", c.id, c.args); - } - - let fut = AssertUnwindSafe(f).catch_unwind(); - let r = fut.await; - - // Request completed, remove the marker. - let ctx = context_guard.take().expect("gRPC context disappeared"); - - match r { - Ok(r) => r, - Err(_e) => { - warn!("{}: gRPC method panicked, args: {}", ctx.id, ctx.args); - Err(Status::cancelled(format!( - "{}: gRPC method panicked", - ctx.id - ))) - } - } + self.replica_svc.locked(ctx, f).await } -} -impl Default for SnapshotService { - fn default() -> Self { - Self::new() + async fn shared(&self, ctx: GrpcClientContext, f: F) -> Result { + self.replica_svc.shared(ctx, f).await } } + impl SnapshotService { - pub fn new() -> Self { + pub fn new(replica_svc: super::replica::ReplicaService) -> Self { Self { name: String::from("SnapshotSvc"), - client_context: tokio::sync::Mutex::new(None), + replica_svc, } } async fn serialized( @@ -490,7 +459,7 @@ impl SnapshotRpc for SnapshotService { &self, request: Request, ) -> GrpcResult { - self.locked( + self.shared( GrpcClientContext::new(&request, function_name!()), async move { let args = request.into_inner(); @@ -739,7 +708,7 @@ impl SnapshotRpc for SnapshotService { &self, request: Request, ) -> GrpcResult { - self.locked( + self.shared( GrpcClientContext::new(&request, function_name!()), async move { let args = request.into_inner(); diff --git a/test/grpc/test_replica.js b/test/grpc/test_replica.js index c91557285..04fc6dfe9 100644 --- a/test/grpc/test_replica.js +++ b/test/grpc/test_replica.js @@ -166,7 +166,7 @@ describe('replica', function () { client.createPool( { name: POOL, disks: disks.map((d) => `${d}?blk_size=1238513`) }, (err) => { - assert.equal(err.code, grpc.status.INTERNAL); + assert.equal(err.code, grpc.status.INVALID_ARGUMENT); done(); } ); @@ -517,7 +517,7 @@ describe('replica', function () { [ (next) => rmBlockFile(next), (next) => fs.writeFile(blockFile, buf, next), - (next) => client.createPool({ name: POOL, disks: disks }, next) + (next) => client.createPool({ name: POOL, disks }, next) ], done );