Skip to content

Commit

Permalink
feature "profiling", some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
rstade committed Jan 2, 2019
1 parent 15646c9 commit b00e8c5
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 330 deletions.
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ path = "src/bin.rs"


[dependencies]
e2d2 = { version = "=0.3.6", path = "../NetBricks/framework", features = ["performance"] }
e2d2 = { version = "=0.4.0", path = "../NetBricks/framework", features = ["performance"] }
netfcts={ version="0.1.2", path = "../netfcts" }
rand = "0.3"
fnv = "*"
Expand All @@ -26,12 +26,11 @@ ipnet = ">=1.0"
toml = "0.4"
serde = "1.0"
serde_derive = "1.0"
eui48 = { version= "0.3", features=["serde"] }
error-chain = ">=0.12"
eui48 = { version= ">=0.3", features=["serde"] }
uuid = { version = ">=0.7", features = ["v4", "serde"] }
separator = ">= 0.3"
#serde_json = "*"
bincode = "*"

[features]

profiling =[]
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

cargo test $1 --no-run
cargo test $* --no-run
25 changes: 10 additions & 15 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ use netfcts::initialize_flowdirector;
use netfcts::tcp_common::{ReleaseCause, CData};
use netfcts::comm::{MessageFrom, MessageTo};
use netfcts::system::SystemData;
use netfcts::errors::*;
use netfcts::io::{ print_tcp_counters };
#[cfg(feature = "profiling")]
use netfcts::io::print_rx_tx_counters;
use netfcts::system::get_mac_from_ifname;

use tcp_proxy::{get_mac_from_ifname, read_config, setup_pipelines};
use tcp_proxy::{read_config, setup_pipelines};
use tcp_proxy::Connection;
use tcp_proxy::Container;
use tcp_proxy::errors::*;
use tcp_proxy::L234Data;
use tcp_proxy::spawn_recv_thread;
use tcp_proxy::TcpState;
Expand Down Expand Up @@ -237,19 +241,10 @@ pub fn main() {

loop {
match reply_mrx.recv_timeout(Duration::from_millis(1000)) {
Ok(MessageTo::Counter(pipeline_id, tcp_counter_c, tcp_counter_s, tx_packets)) => {
println!("\n");
println!("{}: client side {}", pipeline_id, tcp_counter_c);
println!("{}: server side {}", pipeline_id, tcp_counter_s);
if tx_packets.len() > 0 {
info!("{}: tx packets over time", pipeline_id);
info!(" {:>24} -{:8}", tx_packets[0].0.separated_string(), tx_packets[0].1);
}
if tx_packets.len() > 1 {
tx_packets.iter().zip(&tx_packets[1..]).enumerate().for_each(|(i,(&prev, &next))| {
info!("{:4}: {:>24} -{:8}", i, (next.0 - prev.0).separated_string(), (next.1 - prev.1))
});
}
Ok(MessageTo::Counter(pipeline_id, tcp_counter_c, tcp_counter_s, _rx_tx_stats)) => {
print_tcp_counters(&pipeline_id, &tcp_counter_c, &tcp_counter_s);
#[cfg(feature = "profiling")]
print_rx_tx_counters(&pipeline_id, &_rx_tx_stats);
tcp_counters_c.insert(pipeline_id.clone(), tcp_counter_c);
tcp_counters_s.insert(pipeline_id, tcp_counter_s);
}
Expand Down
19 changes: 0 additions & 19 deletions src/errors.rs

This file was deleted.

99 changes: 2 additions & 97 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ extern crate eui48;
extern crate ipnet;
extern crate serde;
extern crate uuid;
#[macro_use]
extern crate error_chain;
extern crate netfcts;

pub mod nftcp;
pub mod errors;
mod cmanager;

pub use netfcts::tcp_common::{CData, L234Data, UserData, TcpRole, TcpState, TcpCounter, TcpStatistics};
Expand All @@ -35,13 +32,13 @@ use eui48::MacAddress;
use uuid::Uuid;
use separator::Separatable;

use e2d2::native::zcsi::*;
use e2d2::common::ErrorKind as E2d2ErrorKind;
use e2d2::scheduler::*;
use e2d2::allocators::CacheAligned;
use e2d2::interface::{PortQueue, PmdPort, FlowDirector};

use errors::*;
use netfcts::errors::*;
use netfcts::io::print_hard_statistics;
use nftcp::setup_forwarder;
use netfcts::{is_kni_core, setup_kni, FlowSteeringMode};
use netfcts::comm::{MessageFrom, MessageTo, PipelineId};
Expand All @@ -52,10 +49,7 @@ use std::io::Read;
use std::any::Any;
use std::net::Ipv4Addr;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::ptr;
use std::sync::mpsc::Sender;
use std::sync::mpsc::Receiver;
use std::thread;
Expand Down Expand Up @@ -206,71 +200,6 @@ impl Container {
}
}

pub fn get_mac_from_ifname(ifname: &str) -> Result<MacAddress> {
let iface = Path::new("/sys/class/net").join(ifname).join("address");
let mut macaddr = String::new();
fs::File::open(iface).map_err(|e| e.into()).and_then(|mut f| {
f.read_to_string(&mut macaddr)
.map_err(|e| e.into())
.and_then(|_| MacAddress::parse_str(&macaddr.lines().next().unwrap_or("")).map_err(|e| e.into()))
})
}

pub fn get_mac_string_from_ifname(ifname: &str) -> Result<String> {
let iface = Path::new("/sys/class/net").join(ifname).join("address");
let mut macaddr = String::new();
fs::File::open(iface).map_err(|e| e.into()).and_then(|mut f| {
f.read_to_string(&mut macaddr)
.map_err(|e| e.into())
.and_then(|_| Ok(macaddr.lines().next().unwrap_or("").to_string()))
})
}

pub fn print_hard_statistics(port_id: u16) -> i32 {
let stats = RteEthStats::new();
let retval;
unsafe {
retval = rte_eth_stats_get(port_id, &stats as *const RteEthStats);
}
if retval == 0 {
println!("Port {}:\n{}\n", port_id, stats);
}
retval
}

pub fn print_xstatistics(port_id: u16) -> i32 {
let len;
unsafe {
len = rte_eth_xstats_get_names_by_id(port_id, ptr::null(), 0, ptr::null());
if len < 0 {
return len;
}
let xstats_names = vec![
RteEthXstatName {
name: [0; RTE_ETH_XSTATS_NAME_SIZE],
};
len as usize
];
let ids = vec![0u64; len as usize];
if len != rte_eth_xstats_get_names_by_id(port_id, xstats_names.as_ptr(), len as u32, ptr::null()) {
return -1;
};
let values = vec![0u64; len as usize];

if len != rte_eth_xstats_get_by_id(port_id, ptr::null(), values.as_ptr(), 0 as u32) {
return -1;
}

for i in 0..len as usize {
rte_eth_xstats_get_id_by_name(port_id, xstats_names[i].to_ptr(), &ids[i]);
{
println!("{}, {}: {}", i, xstats_names[i].to_str().unwrap(), values[i]);
}
}
}
len
}

pub fn setup_pipelines<F1, F2>(
core: i32,
ports: HashSet<CacheAligned<PortQueue>>,
Expand Down Expand Up @@ -353,7 +282,6 @@ pub fn spawn_recv_thread(mrx: Receiver<MessageFrom>, mut context: NetBricksConte
let _handle = thread::spawn(move || {
let mut senders = HashMap::new();
let mut tasks: Vec<Vec<(PipelineId, Uuid)>> = Vec::with_capacity(TaskType::NoTaskTypes as usize);
let mut start_tsc: HashMap<(PipelineId, &'static str), u64> = HashMap::new(); // start time stamps for tasks
let mut reply_to_main = None;

for _t in 0..TaskType::NoTaskTypes as usize {
Expand Down Expand Up @@ -408,29 +336,6 @@ pub fn spawn_recv_thread(mrx: Receiver<MessageFrom>, mut context: NetBricksConte
debug!("{}: task uuid= {}, type={:?}", pipeline_id, uuid, task_type);
tasks[task_type as usize].push((pipeline_id, uuid));
}
Ok(MessageFrom::GenTimeStamp(pipeline_id, item, count, tsc0, tsc1)) => {
debug!(
"pipe {}: GenTimeStamp for item {} -> count= {}, tsc0= {}, tsc1= {}",
pipeline_id,
item,
count,
tsc0.separated_string(),
tsc1.separated_string(),
);
if count == 0 {
start_tsc.insert((pipeline_id, item), tsc0);
} else {
let diff = tsc0 - start_tsc.get(&(pipeline_id.clone(), item)).unwrap();
info!(
"pipe {}: item= {}, count= {}, elapsed= {} cy, per count= {} cy",
pipeline_id,
item,
count,
diff.separated_string(),
diff / count as u64,
);
};
}
Ok(MessageFrom::Channel(pipeline_id, sender)) => {
debug!("got sender from {}", pipeline_id);
senders.insert(pipeline_id, sender);
Expand Down
47 changes: 31 additions & 16 deletions src/nftcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub fn setup_forwarder<F1, F2>(
let pipeline_id_clone = pipeline_id.clone();
let mut counter_c = TcpCounter::new();
let mut counter_s = TcpCounter::new();
let mut sent_packets = Vec::with_capacity(1000);
let mut rx_tx_stats = Vec::with_capacity(1000);

// set up the generator producing timer tick packets with our private EtherType
let (producer_timerticks, consumer_timerticks) = new_mpsc_queue_pair();
Expand All @@ -228,10 +228,14 @@ pub fn setup_forwarder<F1, F2>(
// group 2 -> send to KNI
let uuid_l4groupby = Uuid::new_v4();
let tx_stats = pci.tx_stats();
let rx_stats = pci.rx_stats();
// process TCP traffic addressed to Proxy
let mut time_adder_1 = TimeAdder::new(pipeline_id.clone(), "select_server", 9000);
let mut time_adder_2 = TimeAdder::new(pipeline_id.clone(), "SYN - SYN-ACK", 9000);
let mut time_adder_3= TimeAdder::new(pipeline_id.clone(), "connection lookup", 9000);
#[cfg(feature = "profiling")]
let mut time_adders = [
TimeAdder::new(pipeline_id.clone(), "select_server", 9000),
TimeAdder::new(pipeline_id.clone(), "SYN - SYN-ACK", 9000),
TimeAdder::new(pipeline_id.clone(), "connection lookup", 9000),
];
let mut l4groups = l2_input_stream.parse::<MacHeader>().parse::<IpHeader>().parse::<TcpHeader>().group_by(
3,
box move |p: &mut Packet<TcpHeader, EmptyMetadata>| {
Expand Down Expand Up @@ -440,9 +444,9 @@ pub fn setup_forwarder<F1, F2>(
) where
F: Fn(&mut Connection),
{
let p_clone=p.clone(); // creates reference to the mbuf in p
let payload_sz=tcp_payload_size(&p_clone);
c.payload_packet=Some(p_clone);
let p_clone = p.clone(); // creates reference to the mbuf in p
let payload_sz = tcp_payload_size(&p_clone);
c.payload_packet = Some(p_clone);
f_select_server(c);
c.c2s_inserted_bytes = tcp_payload_size(c.payload_packet.as_ref().unwrap()) as isize - payload_sz as isize;
/*
Expand All @@ -465,15 +469,15 @@ pub fn setup_forwarder<F1, F2>(
*/
set_header(&servers[c.con_rec_s.server_index], c.port(), h, me);
let syn = new_packet().unwrap().push_header(h.mac).unwrap().push_header(h.ip).unwrap().push_header(h.tcp).unwrap();
let mut old_p= unsafe { p.replace(syn) };
let mut old_p = unsafe { p.replace(syn) };
old_p.dereference_mbuf(); // must do this manually, so far
let hs_ip;
let hs_mac;
let hs_tcp;

unsafe {
// converting to raw pointer avoids to borrow mutably from syn
let ptr =p.get_mut_pre_header().unwrap() as *mut IpHeader;
let ptr = p.get_mut_pre_header().unwrap() as *mut IpHeader;
hs_ip = &mut *ptr;
let ptr = p.get_mut_pre_pre_header().unwrap() as *mut MacHeader;
hs_mac = &mut *ptr;
Expand Down Expand Up @@ -525,7 +529,7 @@ pub fn setup_forwarder<F1, F2>(
producer.enqueue_one(p_clone);

if c.payload_packet.is_some() {
let mut payload_packet= c.payload_packet.take().unwrap();
let mut payload_packet = c.payload_packet.take().unwrap();
payload_packet.replace_header(h.tcp); // same tcp header as in Ack packet
/*
let mut delayed_ip = new_packet().unwrap().push_header(h.mac).unwrap().push_header(h.ip).unwrap();
Expand Down Expand Up @@ -609,7 +613,7 @@ pub fn setup_forwarder<F1, F2>(
pipeline_id_clone.clone(),
counter_c.clone(),
counter_s.clone(),
sent_packets.clone(),
rx_tx_stats.clone(),
)).unwrap();
}
Ok(MessageTo::FetchCRecords) => {
Expand All @@ -628,7 +632,12 @@ pub fn setup_forwarder<F1, F2>(
cm.release_timeouts(&utils::rdtsc_unsafe(), &mut wheel);
}
//save stats
sent_packets.push((utils::rdtsc_unsafe(), tx_stats.stats.load(Ordering::Relaxed)) );
let tx_stats_now = tx_stats.stats.load(Ordering::Relaxed);
let rx_stats_now = rx_stats.stats.load(Ordering::Relaxed);
// only save changes
if rx_tx_stats.last().is_none() || tx_stats_now != rx_tx_stats.last().unwrap().1 {
rx_tx_stats.push((utils::rdtsc_unsafe(), rx_stats_now, tx_stats_now));
}
}
_ => {
if hs_flow.dst_port == me.l234.port {
Expand All @@ -639,8 +648,13 @@ pub fn setup_forwarder<F1, F2>(
cm.get_mut_by_sock(&hs_flow.src_socket_addr())
};

if hs.tcp.syn_flag() { time_adder_2.add(utils::rdtsc_unsafe() - timestamp_entry); }
else { time_adder_3.add(utils::rdtsc_unsafe() - timestamp_entry)}
if hs.tcp.syn_flag() {
#[cfg(feature = "profiling")]
time_adders[1].add(utils::rdtsc_unsafe() - timestamp_entry);
} else {
#[cfg(feature = "profiling")]
time_adders[2].add(utils::rdtsc_unsafe() - timestamp_entry)
}

if opt_c.is_none() {
warn!("{} unexpected client side packet (seq={}): no state for socket {}, sending to KNI i/f", thread_id, hs.tcp.seq_num(), hs_flow.src_socket_addr());
Expand Down Expand Up @@ -763,7 +777,8 @@ pub fn setup_forwarder<F1, F2>(
&& old_s_state == TcpState::Listen {
// should be the first payload packet from client
select_server(p, &mut c, &mut hs, &me, &servers, &f_select_server);
time_adder_1.add(utils::rdtsc_unsafe() - timestamp_entry);
#[cfg(feature = "profiling")]
time_adders[1].add(utils::rdtsc_unsafe() - timestamp_entry);
trace!("{} SYN packet to server - L3: {}, L4: {}", thread_id, hs.ip, p.get_header());
c.con_rec_s.push_state(TcpState::SynReceived);
counter_c[TcpStatistics::Payload] += 1;
Expand Down Expand Up @@ -852,7 +867,7 @@ pub fn setup_forwarder<F1, F2>(
counter_c[TcpStatistics::SentFin] += 1;
}
} else if old_c_state >= TcpState::LastAck && hs.tcp.ack_flag() {
if hs.tcp.ack_num() == c.seqn_fin_p2s.wrapping_add(1) {
if hs.tcp.ack_num() == c.seqn_fin_p2s.wrapping_add(1) {
// received Ack from server for a FIN
match old_c_state {
TcpState::LastAck => {
Expand Down
Loading

0 comments on commit b00e8c5

Please sign in to comment.