Skip to content

Commit

Permalink
Flow Scaling (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
GIC-de committed Jan 25, 2024
1 parent d17ecab commit 5505db6
Show file tree
Hide file tree
Showing 23 changed files with 628 additions and 674 deletions.
9 changes: 6 additions & 3 deletions code/bngblaster/src/bbl.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,12 @@ main(int argc, char *argv[])
}
LOG(INFO, "Total PPS of all streams: %.2f\n", g_ctx->total_pps);

if(!bbl_stream_index_init()) {
fprintf(stderr, "Error: Failed to init stream index\n");
goto CLEANUP;
}


/* Setup control job. */
timer_add_periodic(&g_ctx->timer_root, &g_ctx->control_timer, "Control Timer",
1, 0, g_ctx, &bbl_ctrl_job);
Expand All @@ -601,9 +607,6 @@ main(int argc, char *argv[])
}
}

/* Init IO stream token buckets. */
io_init_stream_token_bucket();

/* Start smear job. Use a crazy nsec bucket '12345678',
* such that we do not accidentally smear ourselves. */
timer_add_periodic(&g_ctx->timer_root, &g_ctx->smear_timer, "Timer Smearing",
Expand Down
80 changes: 50 additions & 30 deletions code/bngblaster/src/bbl_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ link_add(char *interface_name)
link_config = calloc(1, sizeof(bbl_link_config_s));
link_config->interface = strdup(interface_name);
link_config->io_mode = g_ctx->config.io_mode;
link_config->io_burst = g_ctx->config.io_burst;
link_config->io_slots_rx = g_ctx->config.io_slots;
link_config->io_slots_tx = g_ctx->config.io_slots;
link_config->qdisc_bypass = g_ctx->config.qdisc_bypass;
Expand All @@ -479,11 +480,12 @@ json_parse_link(json_t *link, bbl_link_config_s *link_config)

const char *schema[] = {
"interface", "description", "mac",
"io-mode", "io-slots", "io-slots-tx",
"io-slots-rx", "qdisc-bypass", "tx-interval",
"rx-interval", "tx-threads", "rx-threads",
"rx-cpuset", "tx-cpuset", "lag-interface",
"lacp-priority"
"io-mode", "io-slots" "io-burst",
"io-slots-tx", "io-slots-rx", "qdisc-bypass",
"tx-interval","rx-interval",
"tx-threads", "rx-threads",
"rx-cpuset", "tx-cpuset",
"lag-interface", "lacp-priority"
};
if(!schema_validate(link, "links", schema,
sizeof(schema)/sizeof(schema[0]))) {
Expand Down Expand Up @@ -553,7 +555,12 @@ json_parse_link(json_t *link, bbl_link_config_s *link_config)
if(value) {
link_config->io_slots_rx = json_number_value(value);
}

JSON_OBJ_GET_NUMBER(link, value, "links", "io-burst", 1, 65535);
if(value) {
link_config->io_burst = json_number_value(value);
} else {
link_config->io_burst = g_ctx->config.io_burst;
}
JSON_OBJ_GET_BOOL(link, value, "links", "qdisc-bypass");
if(value) {
link_config->qdisc_bypass = json_boolean_value(value);
Expand Down Expand Up @@ -2538,31 +2545,38 @@ json_parse_config_streams(json_t *root)
bbl_stream_config_s *stream_config = g_ctx->config.stream_config;

if(json_typeof(root) != JSON_OBJECT) {
fprintf(stderr, "JSON config error: Configuration root element must object\n");
fprintf(stderr, "JSON config error: Configuration root element must be of type object\n");
return false;
}

section = json_object_get(root, "streams");
if(json_is_array(section)) {
/* Get tail end of stream-config list. */
if(stream_config) {
while(stream_config->next) {
stream_config = stream_config->next;
}
if(section) {
if(!json_is_array(section)) {
fprintf(stderr, "JSON config error: Configuration streams element must contain list of objects\n");
return false;
}
/* Config is provided as array (multiple streams) */
size = json_array_size(section);
for(i = 0; i < size; i++) {
if(!stream_config) {
g_ctx->config.stream_config = calloc(1, sizeof(bbl_stream_config_s));
stream_config = g_ctx->config.stream_config;
} else {
stream_config->next = calloc(1, sizeof(bbl_stream_config_s));
stream_config = stream_config->next;
}
if(!json_parse_stream(json_array_get(section, i), stream_config)) {
return false;
}
} else {
return true;
}

/* Get tail end of stream-config list. */
if(stream_config) {
while(stream_config->next) {
stream_config = stream_config->next;
}
}
/* Config is provided as array (multiple streams) */
size = json_array_size(section);
for(i = 0; i < size; i++) {
if(!stream_config) {
g_ctx->config.stream_config = calloc(1, sizeof(bbl_stream_config_s));
stream_config = g_ctx->config.stream_config;
} else {
stream_config->next = calloc(1, sizeof(bbl_stream_config_s));
stream_config = stream_config->next;
}
if(!json_parse_stream(json_array_get(section, i), stream_config)) {
return false;
}
}
return true;
Expand Down Expand Up @@ -3571,7 +3585,7 @@ json_parse_config(json_t *root)
if(json_is_object(section)) {

const char *schema[] = {
"io-mode", "io-slots", "qdisc-bypass",
"io-mode", "io-slots", "io-burst", "qdisc-bypass",
"tx-interval", "rx-interval", "tx-threads",
"rx-threads", "capture-include-streams", "mac-modifier",
"lag", "network", "access", "a10nsp", "links"
Expand Down Expand Up @@ -3608,6 +3622,11 @@ json_parse_config(json_t *root)
if(value) {
g_ctx->config.io_slots = json_number_value(value);
}
value = json_object_get(section, "io-burst");
JSON_OBJ_GET_NUMBER(section, value, "interfaces", "io-burst", 1, 65535);
if(value) {
g_ctx->config.io_burst = json_number_value(value);
}
JSON_OBJ_GET_BOOL(section, value, "interfaces", "qdisc-bypass");
if(value) {
g_ctx->config.qdisc_bypass = json_boolean_value(value);
Expand Down Expand Up @@ -4041,9 +4060,10 @@ bbl_config_init_defaults()
g_ctx->pcap.include_streams = false;
g_ctx->config.username = g_default_user;
g_ctx->config.password = g_default_pass;
g_ctx->config.tx_interval = 1 * MSEC;
g_ctx->config.rx_interval = 1 * MSEC;
g_ctx->config.tx_interval = 0.1 * MSEC;
g_ctx->config.rx_interval = 0.1 * MSEC;
g_ctx->config.io_slots = 4096;
g_ctx->config.io_burst = 256;
g_ctx->config.io_max_stream_len = 9000;
g_ctx->config.qdisc_bypass = true;
g_ctx->config.sessions = 1;
Expand Down Expand Up @@ -4099,7 +4119,7 @@ bbl_config_init_defaults()
g_ctx->config.multicast_traffic_pps = 1000;
g_ctx->config.traffic_autostart = true;
g_ctx->config.stream_rate_calc = true;
g_ctx->config.stream_max_burst = 32;
g_ctx->config.stream_max_burst = 16;
g_ctx->config.multicast_traffic_autostart = true;
g_ctx->config.session_traffic_autostart = true;
}
1 change: 1 addition & 0 deletions code/bngblaster/src/bbl_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ typedef struct bbl_link_config_

uint16_t io_slots_tx;
uint16_t io_slots_rx;
uint16_t io_burst;

bool qdisc_bypass;

Expand Down
8 changes: 4 additions & 4 deletions code/bngblaster/src/bbl_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ bbl_ctx_add()
g_ctx->vlan_session_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key64, bbl_key64_hash, BBL_SESSION_HASHTABLE_SIZE);
g_ctx->l2tp_session_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key32, bbl_key32_hash, BBL_SESSION_HASHTABLE_SIZE);
g_ctx->li_flow_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key32, bbl_key32_hash, BBL_LI_HASHTABLE_SIZE);
g_ctx->stream_flow_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key64, bbl_key64_hash, BBL_STREAM_FLOW_HASHTABLE_SIZE);

return true;
}
Expand Down Expand Up @@ -119,13 +118,14 @@ bbl_ctx_del() {
bbl_session_free(p);
}
}
free(g_ctx->session_list);


if(g_ctx->session_list) free(g_ctx->session_list);
if(g_ctx->stream_index) free(g_ctx->stream_index);

/* Free hash table dictionaries. */
dict_free(g_ctx->vlan_session_dict, NULL);
dict_free(g_ctx->l2tp_session_dict, NULL);
dict_free(g_ctx->li_flow_dict, NULL);
dict_free(g_ctx->stream_flow_dict, NULL);

pcapng_free();
free(g_ctx);
Expand Down
8 changes: 7 additions & 1 deletion code/bngblaster/src/bbl_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ typedef struct bbl_ctx_
dict *vlan_session_dict; /* hashtable for 1:1 vlan sessions */
dict *l2tp_session_dict; /* hashtable for L2TP sessions */
dict *li_flow_dict; /* hashtable for LI flows */
dict *stream_flow_dict; /* hashtable for traffic stream flows */

bbl_stream_s **stream_index;
bbl_stream_s *stream_head;
bbl_stream_s *stream_tail;
uint64_t streams;

bbl_stream_group_s *stream_groups;

Expand All @@ -89,6 +93,7 @@ typedef struct bbl_ctx_
char *ctrl_socket_path;
bbl_ctrl_thread_s *ctrl_thread;
io_thread_s *io_threads; /* single linked list of threads */
io_bucket_s *io_bucket;

bool tcp;
bool dpdk;
Expand Down Expand Up @@ -146,6 +151,7 @@ typedef struct bbl_ctx_
io_mode_t io_mode;

uint16_t io_slots;
uint16_t io_burst;
uint16_t io_max_stream_len;

bool qdisc_bypass;
Expand Down
1 change: 0 additions & 1 deletion code/bngblaster/src/bbl_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

#define BBL_SESSION_HASHTABLE_SIZE 128993 /* is a prime number */
#define BBL_LI_HASHTABLE_SIZE 32771 /* is a prime number */
#define BBL_STREAM_FLOW_HASHTABLE_SIZE 128993 /* is a prime number */

/* Mock Addresses */
#define MOCK_IP_LOCAL 167772170 /* 10.0.0.10 */
Expand Down
2 changes: 1 addition & 1 deletion code/bngblaster/src/bbl_lag.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ typedef struct bbl_lag_

uint8_t active_count;
bbl_lag_member_s *active_list[LAG_MEMBER_ACTIVE_MAX];
uint64_t select;
uint32_t select;

CIRCLEQ_ENTRY(bbl_lag_) lag_qnode;
CIRCLEQ_HEAD(lag_member_, bbl_lag_member_ ) lag_member_qhead; /* list of member interfaces */
Expand Down
1 change: 1 addition & 0 deletions code/bngblaster/src/bbl_protocols.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ typedef enum protocol_error_ {
IGNORED,
EMPTY,
FULL,
WAIT,
} protocol_error_t;

typedef enum icmpv6_message_ {
Expand Down
57 changes: 16 additions & 41 deletions code/bngblaster/src/bbl_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ bbl_rx_stream_network(bbl_network_interface_s *interface,
bbl_ethernet_header_s *eth)
{
bbl_stream_s *stream;
if(!eth->bbl || memcmp(interface->mac, eth->dst, ETH_ADDR_LEN) != 0) {
return false;
}
stream = bbl_stream_rx(eth, NULL);
if(!eth->bbl) return false;
stream = bbl_stream_rx(eth, interface->mac);
if(stream) {
if(stream->rx_network_interface == NULL) {
stream->rx_network_interface = interface;
Expand All @@ -32,29 +30,13 @@ bbl_rx_stream_access(bbl_access_interface_s *interface,
bbl_ethernet_header_s *eth)
{
bbl_stream_s *stream;
bbl_session_s *session;
uint32_t session_id = 0;

if(!(eth->bbl && eth->bbl->type == BBL_TYPE_UNICAST)) {
return false;
}

session_id |= eth->dst[5];
session_id |= eth->dst[4] << 8;
session_id |= eth->dst[3] << 16;

session = bbl_session_get(session_id);
if(session) {
if(session->session_state != BBL_TERMINATED &&
session->session_state != BBL_IDLE) {
stream = bbl_stream_rx(eth, session);
if(stream) {
if(stream->rx_access_interface == NULL) {
stream->rx_access_interface = interface;
}
return true;
}
if(!eth->bbl) return false;
stream = bbl_stream_rx(eth, NULL);
if(stream) {
if(stream->rx_access_interface == NULL) {
stream->rx_access_interface = interface;
}
return true;
}
return false;
}
Expand All @@ -64,10 +46,8 @@ bbl_rx_stream_a10nsp(bbl_a10nsp_interface_s *interface,
bbl_ethernet_header_s *eth)
{
bbl_stream_s *stream;
if(!eth->bbl || memcmp(interface->mac, eth->dst, ETH_ADDR_LEN) != 0) {
return false;
}
stream = bbl_stream_rx(eth, NULL);
if(!eth->bbl) return false;
stream = bbl_stream_rx(eth, interface->mac);
if(stream) {
if(stream->rx_a10nsp_interface == NULL) {
stream->rx_a10nsp_interface = interface;
Expand All @@ -81,19 +61,14 @@ bool
bbl_rx_thread(bbl_interface_s *interface,
bbl_ethernet_header_s *eth)
{
bbl_network_interface_s *network_interface = interface->network;

bbl_network_interface_s *network_interface;
if(interface->state == INTERFACE_DISABLED) {
return true;
}

while(network_interface) {
if(network_interface->vlan == eth->vlan_outer) {
return bbl_rx_stream_network(network_interface, eth);
}
network_interface = network_interface->next;
}
if(interface->access) {
network_interface = interface->network_vlan[eth->vlan_outer];
if(network_interface) {
return bbl_rx_stream_network(network_interface, eth);
} else if(interface->access) {
return bbl_rx_stream_access(interface->access, eth);
} else if(interface->a10nsp) {
return bbl_rx_stream_a10nsp(interface->a10nsp, eth);
Expand Down Expand Up @@ -130,7 +105,7 @@ bbl_rx_handler(bbl_interface_s *interface,
if(!bbl_rx_stream_network(network_interface, eth)) {
bbl_network_rx_handler(network_interface, eth);
}
} else if(interface->access) {
} else if(interface->access) {
if(!bbl_rx_stream_access(interface->access, eth)) {
bbl_access_rx_handler(interface->access, eth);
}
Expand Down
17 changes: 3 additions & 14 deletions code/bngblaster/src/bbl_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,6 @@ bbl_session_monkey_job(timer_s *timer) {
/**
* bbl_session_get
*
* This function allows to change the state of a session including
* the required action caused by state changes.
*
* @param ctx global context
* @param session_id session-id
* @return session or NULL if session not found
*/
Expand Down Expand Up @@ -1749,25 +1745,18 @@ bbl_session_ctrl_traffic_stop(int fd, uint32_t session_id, json_t *arguments __a
int
bbl_session_ctrl_traffic_reset(int fd, uint32_t session_id __attribute__((unused)), json_t *arguments __attribute__((unused)))
{
bbl_stream_s *stream;
struct dict_itor *itor;
bbl_stream_s *stream = g_ctx->stream_head;

g_ctx->stats.session_traffic_flows_verified = 0;

/* Iterate over all traffic streams */
itor = dict_itor_new(g_ctx->stream_flow_dict);
dict_itor_first(itor);
for (; dict_itor_valid(itor); dict_itor_next(itor)) {
stream = (bbl_stream_s*)*dict_itor_datum(itor);
if(!stream) {
continue;
}
while(stream) {
if(stream->session && stream->session_traffic) {
stream->session->session_traffic.flows_verified = 0;
bbl_stream_reset(stream);
}
stream = stream->next;
}
dict_itor_free(itor);
return bbl_ctrl_status(fd, "ok", 200, NULL);
}

Expand Down
Loading

0 comments on commit 5505db6

Please sign in to comment.