Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing I/O engine crashes #1549

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions ci.nix
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{ nospdk ? false, norust ? false, spdk_rel ? false }:
{ nospdk ? false, norust ? false, spdk_rel ? false, asan ? false }:
let
dsavitskiy marked this conversation as resolved.
Show resolved Hide resolved
sources = import ./nix/sources.nix;
pkgs = import sources.nixpkgs {
Expand Down Expand Up @@ -61,7 +61,8 @@ mkShell {
xfsprogs
yasm
] ++ (if (nospdk) then [ spdk.buildInputs ] else [ spdk ])
++ pkgs.lib.optional (!norust) channel.stable
++ pkgs.lib.optional (!norust && asan) channel.asan
++ pkgs.lib.optional (!norust && !asan) channel.stable
++ pkgs.lib.optional (!norust) channel.nightly;

RUST_NIGHTLY_PATH = channel.nightly;
Expand All @@ -72,12 +73,38 @@ mkShell {
FIO_SPDK = if nospdk then null else "${spdk}/fio/spdk_nvme";
ETCD_BIN = "${etcd}/bin/etcd";

IO_ENGINE_DIR = if asan then "target/x86_64-unknown-linux-gnu/debug" else "target/debug";

# ASAN-related Cargo settings.
ASAN_ENABLE = if asan then "1" else null;
ASAN_OPTIONS = if asan then "detect_leaks=0" else null;
ASAN_BUILD_ENV = if asan then "shell" else null;
RUSTFLAGS = if asan then "-Zsanitizer=address" else null;
CARGO_BUILD_RUSTFLAGS = if asan then "-Zbuild-std" else null;
CARGO_BUILD_TARGET = if asan then "x86_64-unknown-linux-gnu" else null;
CARGO_PROFILE_DEV_PANIC = if asan then "unwind" else null;
RUST_BACKTRACE = if asan then "full" else null;

shellHook = ''
${pkgs.lib.optionalString (asan) "export LLVM_SYMBOLIZER_DIR=$(dirname $(realpath $(which llvm-symbolizer)))"}

${pkgs.lib.optionalString (asan) "echo 'AddressSanitizer is enabled, forcing nightly rustc.'"}
${pkgs.lib.optionalString (asan) "echo ' ASAN_ENABLE :' $\{ASAN_ENABLE\}"}
${pkgs.lib.optionalString (asan) "echo ' ASAN_OPTIONS :' $\{ASAN_OPTIONS\}"}
${pkgs.lib.optionalString (asan) "echo ' RUSTFLAGS :' $\{RUSTFLAGS\}"}
${pkgs.lib.optionalString (asan) "echo ' CARGO_BUILD_RUSTFLAGS :' $\{CARGO_BUILD_RUSTFLAGS\}"}
${pkgs.lib.optionalString (asan) "echo ' CARGO_BUILD_TARGET :' $\{CARGO_BUILD_TARGET\}"}
${pkgs.lib.optionalString (asan) "echo ' CARGO_PROFILE_DEV_PANIC :' $\{CARGO_PROFILE_DEV_PANIC\}"}
${pkgs.lib.optionalString (asan) "echo ' RUST_BACKTRACE :' $\{RUST_BACKTRACE\}"}
${pkgs.lib.optionalString (asan) "echo ' LLVM_SYMBOLIZER_DIR :' $\{LLVM_SYMBOLIZER_DIR\}"}
${pkgs.lib.optionalString (asan) "echo"}

${pkgs.lib.optionalString (!nospdk) "echo 'SPDK version :' $(echo $SPDK_PATH | sed 's/.*libspdk-//g')"}
${pkgs.lib.optionalString (!nospdk) "echo 'SPDK path :' $SPDK_PATH"}
${pkgs.lib.optionalString (!nospdk) "echo 'SPDK FIO plugin :' $FIO_SPDK"}
${pkgs.lib.optionalString (!norust) "echo 'Rust version :' $(rustc --version 2> /dev/null || echo '${norustc_msg}')"}
${pkgs.lib.optionalString (!norust) "echo 'Rust path :' $(which rustc 2> /dev/null || echo '${norustc_msg}')"}
echo 'I/O engine dir :' $IO_ENGINE_DIR
${pkgs.lib.optionalString (nospdk) "cowsay ${nospdk_moth}"}
${pkgs.lib.optionalString (nospdk) "export CFLAGS=-msse4"}
${pkgs.lib.optionalString (nospdk) "echo"}
Expand Down
17 changes: 16 additions & 1 deletion io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,10 +851,11 @@ impl<'n> Nexus<'n> {
let name = self.name.clone();

// After calling unregister_bdev_async(), Nexus is gone.
let evt = self.event(EventAction::Delete);
match self.as_mut().bdev_mut().unregister_bdev_async().await {
Ok(_) => {
info!("Nexus '{name}': nexus destroyed ok");
self.event(EventAction::Delete).generate();
evt.generate();
Ok(())
}
Err(err) => {
Expand Down Expand Up @@ -1142,6 +1143,20 @@ impl<'n> Nexus<'n> {
unsafe { Pin::new_unchecked(self.bdev_mut()) }
}

/// Gets a nexus reference from an untyped bdev.
/// # Warning:
/// No checks are performed (e.g. bdev module name check), as it is assumed
dsavitskiy marked this conversation as resolved.
Show resolved Hide resolved
/// that the provided bdev is a nexus bdev.
#[inline(always)]
pub(crate) unsafe fn unsafe_from_untyped_bdev(
bdev: spdk_rs::UntypedBdev,
) -> &'n Nexus<'n> {
spdk_rs::Bdev::<Nexus<'n>>::unsafe_from_inner_ptr(
bdev.unsafe_inner_ptr() as *mut _,
)
.data()
}

/// Sets the required alignment of the Nexus.
pub(crate) unsafe fn set_required_alignment(
self: Pin<&mut Self>,
Expand Down
70 changes: 56 additions & 14 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ impl<'n> Nexus<'n> {
// Close and remove the child.
let res = match self.lookup_child(uri) {
Some(child) => {
// Remove child from the I/O path.
// Detach the child from the I/O path, and close its handles.
if let Some(device) = child.get_device_name() {
self.disconnect_device(&device).await;
self.detach_device(&device).await;
self.disconnect_all_detached_devices().await;
dsavitskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// Close child's device.
Expand Down Expand Up @@ -974,18 +975,39 @@ impl<'n> Nexus<'n> {
return Ok(());
}

// Disconnect the device from all the channels.
// Detach the device from all the channels.
//
// After disconnecting, the device will no longer be used by the
// channels, and all I/Os failing due to this device will eventually
// resubmit and succeeded (if any healthy children are left).
self.disconnect_device(&dev).await;
//
// Device disconnection is done in two steps (detach, than disconnect)
// in order to prevent an I/O race when retiring a device.
self.detach_device(&dev).await;

// Disconnect the devices with failed controllers _before_ pause,
// otherwise pause would stuck. Keep all controoled that are _not_
// failed (e.g., in the case I/O failed due to ENOSPC).
self.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|h| h.is_ctrlr_failed());
})
.await;

// Destroy (close) the device. The subsystem must be paused to do this
// properly.
// Disconnect, destroy and close the device. The subsystem must be
// paused to do this properly.
{
debug!("{self:?}: retire: pausing...");
self.as_mut().pause().await?;
debug!("{self:?}: retire: pausing ok");
let res = self.as_mut().pause().await;
dsavitskiy marked this conversation as resolved.
Show resolved Hide resolved
match &res {
Ok(_) => debug!("{self:?}: retire: pausing ok"),
Err(e) => warn!("{self:?}: retire: pausing failed: {e}"),
};

// Disconnect the all previously detached device handles. This has
// to be done after the pause to prevent an I/O race.
self.disconnect_all_detached_devices().await;

res?;

self.child_retire_destroy_device(&dev).await;

Expand Down Expand Up @@ -1055,20 +1077,39 @@ impl<'n> Nexus<'n> {
Ok(())
}

/// Disconnects a device from all I/O channels.
pub(crate) async fn disconnect_device(&self, dev: &str) {
/// Detaches the device's handles from all I/O channels.
///
/// The detached handles must be disconnected and dropped by a
/// `disconnect_detached_devices()` call.
///
/// Device disconnection is done in two steps (detach, than disconnect) in
/// order to prevent an I/O race when retiring a device.
pub(crate) async fn detach_device(&self, dev: &str) {
if !self.has_io_device {
return;
}

debug!("{self:?}: disconnecting '{dev}' from all channels ...");
debug!("{self:?}: detaching '{dev}' from all channels...");

self.traverse_io_channels_async(dev, |channel, dev| {
channel.disconnect_device(dev);
channel.detach_device(dev);
})
.await;

debug!("{self:?}: '{dev}' detached from all I/O channels");
}

/// Disconnects all the detached devices on all I/O channels by dropping
/// their handles.
pub(crate) async fn disconnect_all_detached_devices(&self) {
debug!("{self:?}: disconnecting all detached devices ...");

self.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|_| true);
})
.await;

debug!("{self:?}: '{dev}' disconnected from all I/O channels");
debug!("{self:?}: disconnected all detached devices");
}

/// Destroys the device being retired.
Expand Down Expand Up @@ -1143,7 +1184,8 @@ impl<'n> Nexus<'n> {

// Step 1: Close I/O channels for all children.
for dev in nexus.child_devices() {
nexus.disconnect_device(&dev).await;
nexus.detach_device(&dev).await;
nexus.disconnect_all_detached_devices().await;

device_cmd_queue().enqueue(DeviceCommand::RetireDevice {
nexus_name: nexus.name.clone(),
Expand Down
57 changes: 50 additions & 7 deletions io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use spdk_rs::Thread;
pub struct NexusChannel<'n> {
writers: Vec<Box<dyn BlockDeviceHandle>>,
readers: Vec<Box<dyn BlockDeviceHandle>>,
detached: Vec<Box<dyn BlockDeviceHandle>>,
io_logs: Vec<IOLogChannel>,
previous_reader: UnsafeCell<usize>,
fail_fast: u32,
Expand Down Expand Up @@ -123,6 +124,7 @@ impl<'n> NexusChannel<'n> {
Self {
writers,
readers,
detached: Vec::new(),
io_logs: nexus.io_log_channels(),
previous_reader: UnsafeCell::new(0),
nexus: unsafe { nexus.pinned_mut() },
Expand Down Expand Up @@ -209,16 +211,57 @@ impl<'n> NexusChannel<'n> {
}
}

/// Disconnects a child device from the I/O path.
pub fn disconnect_device(&mut self, device_name: &str) {
/// Detaches a child device from this I/O channel, moving the device's
/// handles to the list of detached devices to disconnect later.
///
/// The detached handles must be disconnected and dropped by a
/// `disconnect_detached_devices()` call.
pub(super) fn detach_device(&mut self, device_name: &str) {
self.previous_reader = UnsafeCell::new(0);

self.readers
.retain(|c| c.get_device().device_name() != device_name);
self.writers
.retain(|c| c.get_device().device_name() != device_name);
if let Some(d) = self
.readers
.iter()
.position(|c| c.get_device().device_name() == device_name)
{
let t = self.readers.remove(d);
self.detached.push(t);
}

if let Some(d) = self
.writers
.iter()
.position(|c| c.get_device().device_name() == device_name)
{
let t = self.writers.remove(d);
self.detached.push(t);
}

debug!("{self:?}: device '{device_name}' detached");
}

/// Disconnects previously detached device handles by dropping them.
/// Devices to drop are filtered by the given predicate: true to drop
/// a device, false to keep it.
pub(super) fn disconnect_detached_devices<F>(&mut self, mut drop_pred: F)
where
F: FnMut(&dyn BlockDeviceHandle) -> bool,
{
let n = self.detached.len();
info!("{self:?}: disconnecting {n} detached device handles...");

self.detached.retain(|h| !drop_pred(h.as_ref()));

debug!("{self:?}: device '{device_name}' disconnected");
let m = self.detached.len();
if m == 0 {
info!("{self:?}: all detached device handles disconnected");
} else {
let d = n - m;
info!(
"{self:?}: {d} detached device handle(s) disconnected, \
{m} remain(s)"
);
}
}

/// Refreshing our channels simply means that we either have a child going
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/bdev/nexus/nexus_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,8 @@ impl<'n> NexusBio<'n> {
// set the IO as failed in the submission stage.
self.ctx_mut().failed += 1;

self.channel_mut().disconnect_device(&device);
self.channel_mut().detach_device(&device);
self.channel_mut().disconnect_detached_devices(|_| true);
tiagolobocastro marked this conversation as resolved.
Show resolved Hide resolved

if let Some(log) = self.fault_device(
&device,
Expand Down
2 changes: 2 additions & 0 deletions io-engine/src/bdev/nvmx/controller_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ impl SpdkNvmeController {
}
}

/// Returns a pointer to the underlying SPDK struct.
#[inline(always)]
pub fn as_ptr(&self) -> *mut spdk_nvme_ctrlr {
self.0.as_ptr()
}
Expand Down
5 changes: 5 additions & 0 deletions io-engine/src/bdev/nvmx/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,11 @@ impl BlockDeviceHandle for NvmeDeviceHandle {
let id = inner.ext_host_id();
Ok(*id)
}

/// Determines if the underlying controller is failed.
fn is_ctrlr_failed(&self) -> bool {
self.ctrlr.is_failed
}
}

impl Drop for NvmeDeviceHandle {
Expand Down
5 changes: 5 additions & 0 deletions io-engine/src/core/block_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ pub trait BlockDeviceHandle {
cb: IoCompletionCallback,
cb_arg: IoCompletionCallbackArg,
) -> Result<(), CoreError>;

/// Determines if the underlying controller is failed.
fn is_ctrlr_failed(&self) -> bool {
false
}
}

fn block_device_io_completion(
Expand Down
26 changes: 26 additions & 0 deletions io-engine/src/core/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,10 @@ impl MayastorEnvironment {
// setup the logger as soon as possible
self.init_logger();

if option_env!("ASAN_ENABLE").unwrap_or_default() == "1" {
print_asan_env();
}

self.load_yaml_config();

if let Some(ptpl) = &self.ptpl_dir {
Expand Down Expand Up @@ -1064,3 +1068,25 @@ fn make_hostnqn(node_name: Option<&String>) -> Option<String> {
node_name.map(|n| format!("{NVME_NQN_PREFIX}:node-name:{n}"))
})
}

fn print_asan_env() {
fn print_var(s: &str, v: Option<&str>) {
let v = v.unwrap_or_default();
info!(" {s:25} = {v}");
}

warn!("Compiled with Address Sanitizer enabled");
print_var("ASAN_OPTIONS", option_env!("ASAN_OPTIONS"));
print_var("ASAN_BUILD_ENV", option_env!("ASAN_BUILD_ENV"));
print_var("RUSTFLAGS", option_env!("RUSTFLAGS"));
print_var(
"CARGO_BUILD_RUSTFLAGS",
option_env!("CARGO_BUILD_RUSTFLAGS"),
);
print_var("CARGO_BUILD_TARGET", option_env!("CARGO_BUILD_TARGET"));
print_var(
"CARGO_PROFILE_DEV_PANIC",
option_env!("CARGO_PROFILE_DEV_PANIC"),
);
print_var("RUST_BACKTRACE", option_env!("RUST_BACKTRACE"));
}
7 changes: 4 additions & 3 deletions io-engine/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ impl ToErrno for CoreError {

/// Logical volume layer failure.
#[derive(Debug, Copy, Clone, Eq, PartialOrd, PartialEq)]
#[repr(C)]
pub enum LvolFailure {
NoSpace,
}
Expand All @@ -472,6 +473,7 @@ pub enum IoSubmissionFailure {
// Generic I/O completion status for block devices, which supports per-protocol
// error domains.
#[derive(Copy, Clone, Eq, PartialOrd, PartialEq)]
#[repr(C)]
pub enum IoCompletionStatus {
Success,
NvmeError(NvmeStatus),
Expand Down Expand Up @@ -501,10 +503,9 @@ impl From<NvmeStatus> for IoCompletionStatus {
match s {
NvmeStatus::NO_SPACE
| NvmeStatus::Generic(SPDK_NVME_SC_CAPACITY_EXCEEDED) => {
IoCompletionStatus::LvolError(LvolFailure::NoSpace)
Self::LvolError(LvolFailure::NoSpace)
}

_ => IoCompletionStatus::NvmeError(s),
_ => Self::NvmeError(s),
}
}
}
Expand Down
Loading
Loading