Skip to content

Commit

Permalink
fix ci after introducing commands and unsigned int types
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Jul 10, 2024
1 parent b5ef0a1 commit 947c10d
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 35 deletions.
14 changes: 7 additions & 7 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use clap::Parser;
use datafusion::{
arrow::{
array::{Array, AsArray, ListArray, RecordBatch},
datatypes::{Int32Type, Int64Type},
datatypes::{UInt32Type, UInt64Type},
},
common::{FileType, GetExt},
datasource::{file_format::parquet::ParquetFormat, listing::ListingOptions},
Expand Down Expand Up @@ -194,15 +194,15 @@ struct OffcpuDataFrame(Vec<RecordBatch>);
impl OffcpuDataFrame {
fn iter(&self) -> impl Iterator<Item = OffcpuData> + '_ {
self.0.iter().flat_map(|batch| {
let tgid = batch.column(0).as_primitive::<Int32Type>();
let pid = batch.column(1).as_primitive::<Int32Type>();
let offcpu = batch.column(2).as_primitive::<Int64Type>();
let tgid = batch.column(0).as_primitive::<UInt32Type>();
let pid = batch.column(1).as_primitive::<UInt32Type>();
let offcpu = batch.column(2).as_primitive::<UInt64Type>();
let kstacks = batch.column(3).as_any().downcast_ref::<ListArray>().unwrap();
multizip((tgid.iter(), pid.iter(), offcpu.iter(), kstacks.iter())).map(|(tgid, pid, offcpu, kstacks)| {
OffcpuData {
tgid: tgid.unwrap_or(0) as u32,
pid: pid.unwrap_or(0) as u32,
offcpu: Duration::from_nanos(offcpu.unwrap_or(0) as u64),
tgid: tgid.unwrap_or(0),
pid: pid.unwrap_or(0),
offcpu: Duration::from_nanos(offcpu.unwrap_or(0)),
kstack: kstacks,
}
})
Expand Down
7 changes: 2 additions & 5 deletions past/proptest-regressions/tests.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion past/src/bpf/past.skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2297,7 +2297,7 @@ mod imp {
110, 97, 108, 95, 115, 116, 114, 117, 99, 116, 0, 116, 97, 115, 107, 95, 100, 101, 108, 97, 121, 95, 105, 110,
102, 111, 0, 116, 97, 115, 107, 95, 103, 114, 111, 117, 112, 0, 116, 114, 97, 99, 101, 95, 101, 118, 101, 110,
116, 95, 99, 97, 108, 108, 0, 117, 112, 114, 111, 98, 101, 95, 116, 97, 115, 107, 0, 118, 109, 95, 115, 116,
114, 117, 99, 116, 0, 47, 116, 109, 112, 47, 46, 116, 109, 112, 101, 68, 107, 103, 76, 71, 47, 98, 112, 102,
114, 117, 99, 116, 0, 47, 116, 109, 112, 47, 46, 116, 109, 112, 115, 116, 89, 77, 104, 82, 47, 98, 112, 102,
47, 115, 114, 99, 47, 98, 112, 102, 47, 117, 115, 100, 116, 46, 98, 112, 102, 46, 104, 0, 9, 105, 102, 32, 40,
33, 76, 73, 78, 85, 88, 95, 72, 65, 83, 95, 66, 80, 70, 95, 67, 79, 79, 75, 73, 69, 41, 32, 123, 0, 9, 9, 108,
111, 110, 103, 32, 105, 112, 32, 61, 32, 80, 84, 95, 82, 69, 71, 83, 95, 73, 80, 40, 99, 116, 120, 41, 59, 0,
Expand Down
3 changes: 2 additions & 1 deletion past/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl Collector {
// all integers are cast to a signed form because of the API provided by rust parquet lib
// arithmetic operations will be correctly performed on unsigned integers, configured in schema
// TODO maybe i should move cast closer to the schema definition

match event {
Received::Switch(event) => {
let command = match self.tgid_to_command.get(&event.tgid) {
Expand All @@ -102,7 +103,7 @@ impl Collector {
duration: (event.end - event.start) as i64,
cpu: event.cpu_id as i32,
tgid: event.tgid as i32,
pid: event.tgid as i32,
pid: event.pid as i32,
command: command.clone(),
ustack: event.ustack,
kstack: event.kstack,
Expand Down
11 changes: 9 additions & 2 deletions past/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@ impl<Fr: Frames, Sym: Symbolizer> Program<Fr, Sym> {
}

pub fn on_event(&mut self, event: Received) -> Result<()> {
self.stats.total_rows += 1;
self.stats.rows_in_current_file += 1;
// TODO i need to adjust stats based on response from on_event
// this is hotfix for ci
match event {
Received::TraceEnter(_) | Received::ProcessExec(_) | Received::ProcessExit(_) | Received::Unknown(_) => {}
Received::Switch(_) | Received::TraceExit(_) | Received::PerfStack(_) | Received::TraceClose(_) => {
self.stats.total_rows += 1;
self.stats.rows_in_current_file += 1;
}
}
on_event(
self.writer.as_mut().expect("writer should be present"),
&self.frames,
Expand Down
26 changes: 13 additions & 13 deletions past/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use blazesym::symbolize::{Reason, Sym, Symbolized};
use datafusion::{
arrow::{
array::{AsArray, ListArray, RecordBatch},
datatypes::{Int32Type, Int64Type},
datatypes::{UInt16Type, UInt32Type, UInt64Type},
},
common::{FileType, GetExt},
datasource::{file_format::parquet::ParquetFormat, listing::ListingOptions},
Expand Down Expand Up @@ -471,10 +471,10 @@ struct StoredPerf {
impl Batch {
fn iter_perf(&self) -> impl Iterator<Item = StoredPerf> + '_ {
self.0.iter().flat_map(|batch| {
let timestamp = batch.column(0).as_primitive::<Int64Type>();
let cpu = batch.column(1).as_primitive::<Int32Type>();
let tgid = batch.column(2).as_primitive::<Int32Type>();
let pid = batch.column(3).as_primitive::<Int32Type>();
let timestamp = batch.column(0).as_primitive::<UInt64Type>();
let cpu = batch.column(1).as_primitive::<UInt16Type>();
let tgid = batch.column(2).as_primitive::<UInt32Type>();
let pid = batch.column(3).as_primitive::<UInt32Type>();
let command = batch.column(4).as_string::<i32>();

let ustack = batch.column(5);
Expand All @@ -492,7 +492,7 @@ impl Batch {
kstack_array.iter(),
))
.map(|(timestamp, cpu, tgid, pid, command, ustack, kstack)| StoredPerf {
timestamp: *timestamp as u64,
timestamp: *timestamp,
cpu: *cpu as u64,
tgid: *tgid as u64,
pid: *pid as u64,
Expand Down Expand Up @@ -528,11 +528,11 @@ impl Batch {

fn iter_switch(&self) -> impl Iterator<Item = StoredSwitch> + '_ {
self.0.iter().flat_map(|batch| {
let timestamp = batch.column(0).as_primitive::<Int64Type>();
let duration = batch.column(1).as_primitive::<Int64Type>();
let cpu = batch.column(2).as_primitive::<Int32Type>();
let tgid = batch.column(3).as_primitive::<Int32Type>();
let pid = batch.column(4).as_primitive::<Int32Type>();
let timestamp = batch.column(0).as_primitive::<UInt64Type>();
let duration = batch.column(1).as_primitive::<UInt64Type>();
let cpu = batch.column(2).as_primitive::<UInt16Type>();
let tgid = batch.column(3).as_primitive::<UInt32Type>();
let pid = batch.column(4).as_primitive::<UInt32Type>();
let command = batch.column(5).as_string::<i32>();

let ustack = batch.column(6);
Expand All @@ -553,8 +553,8 @@ impl Batch {
))
.map(
|(timestamp, duration, cpu, tgid, pid, command, ustack, kstack)| StoredSwitch {
timestamp: *timestamp as u64,
duration: *duration as u64,
timestamp: *timestamp,
duration: *duration,
cpu: *cpu as u64,
tgid: *tgid as u64,
pid: *pid as u64,
Expand Down
10 changes: 5 additions & 5 deletions past/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ impl Deref for Comm {
}

#[derive(Debug)]
pub(crate) struct Proc{
pub(crate) struct Proc {
pub(crate) tgid: i32,
pub(crate) comm: Comm,
pub(crate) comm: Comm,
}

pub fn scan_proc(comms: HashSet<&str>,) -> anyhow::Result<Vec<Proc>> {
pub fn scan_proc(comms: HashSet<&str>) -> anyhow::Result<Vec<Proc>> {
let mut rst = vec![];
for entry in fs::read_dir("/proc")? {
let entry = entry?;
Expand All @@ -74,10 +74,10 @@ pub fn scan_proc(comms: HashSet<&str>,) -> anyhow::Result<Vec<Proc>> {
if let Ok(tgid) = name.parse::<u32>() {
let comm = fs::read_to_string(path.join("comm"))?;
if comms.contains(comm.trim()) {
rst.push(Proc{
rst.push(Proc {
tgid: tgid as i32,
comm: Comm::from(&comm),
});
});
debug!("discovered tgid = {} for command = {}", tgid, comm);
}
}
Expand Down
2 changes: 1 addition & 1 deletion pastconv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct Opt {
cmd: Command,
#[clap(short, long, global = true, default_value = "/tmp/past/STACKS-*.parquet")]
register: String,
#[clap(short, long, global = true, help="print version and exit")]
#[clap(short, long, global = true, help = "print version and exit")]
version: bool,
}

Expand Down

0 comments on commit 947c10d

Please sign in to comment.