Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Features][CustomSched][BPFLib] #29

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ __pycache__/*.lock
**.lock
**__pycache__**
.idea/
.vscode/
.vscode/
**.swp
4 changes: 2 additions & 2 deletions src/Ilúvatar/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ iluvatar_worker/src/worker.dev*json
iluvatar_worker_library/tests/resources/worker.dev*json
iluvatar_controller/src/controller.dev*json
target/
**.swp
.vscode/
**.sw*
.vscode/
4 changes: 3 additions & 1 deletion src/Ilúvatar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ members = [
"iluvatar_library",
"iluvatar_worker_library",
"iluvatar_energy_mon",
"iluvatar_rpc"
"iluvatar_rpc",
"iluvatar_bpf_library",
"fs_policy_tsksz",
]
resolver = "2"

Expand Down
6 changes: 5 additions & 1 deletion src/Ilúvatar/Cross.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ passthrough = ["ARCH=amd64", "GO_VERSION=1.22.0", "CNI_VERSION=v1.1.1", "GOPATH=
default-target = "x86_64-unknown-linux-gnu" # use this target if none is explicitly provided
pre-build = [ # additional commands to run prior to building the package
"dpkg --add-architecture $CROSS_DEB_ARCH",
"apt-get update && apt-get --assume-yes install protobuf-compiler iproute2 wget curl runc bridge-utils iptables net-tools sysstat"
"apt-get update && apt-get --assume-yes install protobuf-compiler iproute2 wget curl runc bridge-utils iptables net-tools sysstat libelf-dev lsb-release wget software-properties-common gnupg",
"wget https://apt.llvm.org/llvm.sh && chmod +x llvm.sh && ./llvm.sh all && ln -s /usr/bin/ld.lld-18 /usr/bin/ld.lld && ln -s /usr/bin/clang-18 /usr/bin/clang && ln -s /usr/bin/clang++-18 /usr/bin/clang++ && rm llvm.sh"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add these to the setup docs as new dependencies? I assume they're required for the bpf stuff

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are required at the build time. From what I understand, cross builds in a container environment so these dependencies need to be installed there. It does not need to be on the host. Setup docs are for the host right?

]



8 changes: 6 additions & 2 deletions src/Ilúvatar/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ endif
RUST_C?=cross
DEBUG_FLAGS=--all-targets $(TARGET) -j $(NPROCS)
RELEASE_FLAGS=--lib --bins $(TARGET) -j $(NPROCS) --release
CARGO_ARGS?=""
CARGO_ARGS?=

default: debug

Expand All @@ -23,7 +23,7 @@ clean:
@$(RUST_C) clean
check:
@echo "Checking"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) check --all-features $(DEBUG_FLAGS) $(CARGO_ARGS)
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) check $(DEBUG_FLAGS) $(CARGO_ARGS)
release:
@echo "Building release"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) build $(RELEASE_FLAGS) $(CARGO_ARGS)
Expand All @@ -37,6 +37,10 @@ tiny:
debug:
@echo "Building debug"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) build $(DEBUG_FLAGS) $(CARGO_ARGS)
fix:
@echo "Fixing lint errors"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) fix $(DEBUG_FLAGS) $(CARGO_ARGS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine this with the clippy fix command you changed below? Would be nice to have in the makefile to simplify fixing them locally

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) clippy --fix --allow-no-vcs --workspace --examples --benches --no-deps -- -Dclippy::suspicious -Dclippy::correctness -Dclippy::perf -Aclippy::single_match -Aclippy::new_without_default -Aclippy::too_many_arguments -Aclippy::type-complexity -Dclippy::from_over_into -Aclippy::redundant-field-names -Dwarnings
spans:
@echo "Building full_spans"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) build --features full_spans $(RELEASE_FLAGS) $(CARGO_ARGS)
Expand Down
8 changes: 8 additions & 0 deletions src/Ilúvatar/ansible/worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@
remote_src: "{{__remote_bin_src}}"
become: yes

- name: copy fs_policy_tsksz
ansible.builtin.copy:
src: "{{__bin_src}}/fs_policy_tsksz"
dest: "{{bin_dir}}/"
mode: "preserve"
remote_src: "{{__remote_bin_src}}"
become: yes

- name: copy worker config
ansible.builtin.copy:
src: "{{__bin_src}}/{{worker.config_name}}"
Expand Down
Empty file.
33 changes: 33 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "fs_policy_tsksz"
version = "0.0.3"
authors = ["Abdul Rehman <abdulrehmanee010@gmail.com>"]
edition = "2021"
description = "A simple scheduler that preserves locality for a function cgroup"
license = "GPL-2.0-only"

[dependencies]
anyhow = "1.0.65"
plain = "0.2.3"
ctrlc = { version = "3.1", features = ["termination"] }
libbpf-rs = "0.24.1"
libc = "0.2.137"
scx_utils = { version = "1.0.7" }
scx_rustland_core = { version = "2.2.3" }

# Specific to iluvatar
iluvatar_library = { path = "../iluvatar_library" }
iluvatar_worker_library = { path = "../iluvatar_worker_library" }
clap = { version = "4.5.4", features = ["derive"] }
ipc-channel = { version = "0.18.1", features = ["memfd"] }
serde = { version = "1.0" }

[build-dependencies]
scx_utils = { version = "1.0.7" }
scx_rustland_core = { version = "2.2.3" }

[features]
enable_backtrace = []



1 change: 1 addition & 0 deletions src/Ilúvatar/fs_policy_tsksz/LICENSE
20 changes: 20 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# scx_rlfifo

This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main).

## Overview

scx_rlfifo is a simple FIFO scheduler runs in user-space, based on the
scx_rustland_core framework.

## Typical Use Case

This scheduler is provided as a simple template that can be used as a baseline
to test more complex scheduling policies.

## Production Ready?

Definitely not. Using this scheduler in a production environment is not
recommended, unless there are specific requirements that necessitate a basic
FIFO scheduling approach. Even then, it's still recommended to use the kernel's
SCHED_FIFO real-time class.
11 changes: 11 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

fn main() {
scx_utils::BpfBuilder::new()
.unwrap()
.enable_intf("src/bpf/intf.h", "bpf_intf.rs")
.enable_skel("src/bpf/main.bpf.c", "bpf")
.build()
.unwrap();
}
14 changes: 14 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/meson.build
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
if serialize
sched_deps = [libbpf, bpftool_target, sched]
else
sched_deps = [libbpf, bpftool_target]
endif

sched = custom_target('scx_rlfifo',
output: '@PLAINNAME@.__PHONY__',
input: 'Cargo.toml',
command: [cargo, 'build', '--manifest-path=@INPUT@', '--target-dir=@OUTDIR@',
cargo_build_args],
env: cargo_env,
depends: sched_deps,
build_always_stale: true)
8 changes: 8 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Get help on options with `rustfmt --help=config`
# Please keep these in alphabetical order.
edition = "2021"
group_imports = "StdExternalCrate"
imports_granularity = "Item"
merge_derives = false
use_field_init_shorthand = true
version = "Two"
189 changes: 189 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/src/bpf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>

// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

use scx_utils::enums::scx_enums;
use scx_utils::import_enums;
use std::mem::MaybeUninit;

use crate::bpf_intf;
use crate::bpf_skel::*;

use anyhow::Context;
use anyhow::Result;

use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
use libbpf_rs::OpenObject;

use libc::{pthread_self, pthread_setschedparam, sched_param};

#[cfg(target_env = "musl")]
use libc::timespec;

use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::UserExitInfo;

use scx_rustland_core::ALLOCATOR;

// Defined in UAPI
const SCHED_EXT: i32 = 7;

pub struct BpfScheduler<'cb> {
pub skel: BpfSkel<'cb>, // Low-level BPF connector
struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods
queued_stats: libbpf_rs::RingBuffer<'cb>, // ring buffer of tasks pids to be switched to schedext
}

#[derive(Clone, Copy, Debug)]
#[allow(non_camel_case_types, dead_code)]
pub struct lpolicy_stats(bpf_intf::policy_stats);

macro_rules! define_buffer {
( $bufname: ident, $abufname: ident, $abuf: ident, $callback: ident, $tdst: ty ) => {
const $bufname: usize = std::mem::size_of::<$tdst>();
#[repr(align(8))]
struct $abufname([u8; $bufname]);
static mut $abuf: $abufname = $abufname([0; $bufname]);
fn $callback(data: &[u8]) -> i32 {
unsafe {
$abuf.0.copy_from_slice(data);
}
LIBBPF_STOP
}
};
}

define_buffer!(
BUFSIZE_STATS,
AlignedBufferstats,
BUF_STATS,
callback_stats,
bpf_intf::policy_stats
);
fn fetch_stats(bytes: &[u8]) -> lpolicy_stats {
let ps = unsafe { *(bytes.as_ptr() as *const bpf_intf::policy_stats) };
lpolicy_stats(ps)
}

// Special negative error code for libbpf to stop after consuming just one item from a BPF
// ring buffer.
const LIBBPF_STOP: i32 = -255;

impl<'cb> BpfScheduler<'cb> {
pub fn init(
open_object: &'cb mut MaybeUninit<OpenObject>,
slice_us: u64,
exit_dump_len: u32,
verbose: bool,
) -> Result<Self> {
// Open the BPF prog first for verification.
let mut skel_builder = BpfSkelBuilder::default();
skel_builder.obj_builder.debug(verbose);
let mut skel = scx_ops_open!(skel_builder, open_object, tsksz_ops)?;

// Lock all the memory to prevent page faults that could trigger potential deadlocks during
// scheduling.
ALLOCATOR.lock_memory();

skel.struct_ops.tsksz_ops_mut().exit_dump_len = exit_dump_len;
skel.maps.bss_data.usersched_pid = std::process::id();
skel.maps.rodata_data.effective_slice_ns = slice_us * 1000;

let path = "/sys/fs/bpf/func_metadata";
let func_metadata = &mut skel.maps.func_metadata;
assert!(func_metadata.reuse_pinned_map("/asdf").is_err());
func_metadata
.reuse_pinned_map(path)
.expect("failed to reuse map");

// Attach BPF scheduler.
let mut skel = scx_ops_load!(skel, tsksz_ops, uei)?;
let struct_ops = Some(scx_ops_attach!(skel, tsksz_ops)?);

// Build the ring buffer of queued tasks.
let rb_map = &mut skel.maps.queued_stats;
let mut builder = libbpf_rs::RingBufferBuilder::new();
builder.add(rb_map, callback_stats).unwrap();
let queued_stats = builder.build().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want upwrap and panic! because they can cause hard-to-debug issues and the error is only sent to stderr and easily lost

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs_policy_tsksz is launched as a separate process and the stderr is being logged to a separate log file. In case of a panic log file can be interpreted offline.


// Make sure to use the SCHED_EXT class at least for the scheduler itself.
match Self::use_sched_ext() {
0 => Ok(Self {
skel,
struct_ops,
queued_stats,
}),
err => Err(anyhow::Error::msg(format!(
"sched_setscheduler error: {}",
err
))),
}
}

// Receive stats from the BPF scheduler to switch to schedext policy.
pub fn dequeue_stats(&mut self) -> Result<Option<lpolicy_stats>, i32> {
match self.queued_stats.consume_raw() {
0 => Ok(None),
LIBBPF_STOP => {
// A valid pid is received, convert data to a proper pid.
let stats = unsafe { fetch_stats(&BUF_STATS.0) };
Ok(Some(stats))
}
res if res < 0 => Err(res),
res => panic!(
"Unexpected return value from libbpf-rs::consume_raw(): {}",
res
),
}
}

// Set scheduling class for the scheduler itself to SCHED_EXT
fn use_sched_ext() -> i32 {
#[cfg(target_env = "gnu")]
let param: sched_param = sched_param { sched_priority: 0 };
#[cfg(target_env = "musl")]
let param: sched_param = sched_param {
sched_priority: 0,
sched_ss_low_priority: 0,
sched_ss_repl_period: timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_init_budget: timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_max_repl: 0,
};

unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
}

// Read exit code from the BPF part.
pub fn exited(&mut self) -> bool {
uei_exited!(&self.skel, uei)
}

// Called on exit to shutdown and report exit message from the BPF part.
pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
self.struct_ops.take();
uei_report!(&self.skel, uei)
}
}

// Disconnect the low-level BPF scheduler.
impl<'a> Drop for BpfScheduler<'a> {
fn drop(&mut self) {
if let Some(struct_ops) = self.struct_ops.take() {
drop(struct_ops);
}
ALLOCATOR.unlock_memory();
}
}
Loading
Loading