diff --git a/code/bngblaster/src/bbl.c b/code/bngblaster/src/bbl.c index 56bb60ba..27be9681 100644 --- a/code/bngblaster/src/bbl.c +++ b/code/bngblaster/src/bbl.c @@ -590,6 +590,12 @@ main(int argc, char *argv[]) } LOG(INFO, "Total PPS of all streams: %.2f\n", g_ctx->total_pps); + if(!bbl_stream_index_init()) { + fprintf(stderr, "Error: Failed to init stream index\n"); + goto CLEANUP; + } + + /* Setup control job. */ timer_add_periodic(&g_ctx->timer_root, &g_ctx->control_timer, "Control Timer", 1, 0, g_ctx, &bbl_ctrl_job); @@ -601,9 +607,6 @@ main(int argc, char *argv[]) } } - /* Init IO stream token buckets. */ - io_init_stream_token_bucket(); - /* Start smear job. Use a crazy nsec bucket '12345678', * such that we do not accidentally smear ourselves. */ timer_add_periodic(&g_ctx->timer_root, &g_ctx->smear_timer, "Timer Smearing", diff --git a/code/bngblaster/src/bbl_config.c b/code/bngblaster/src/bbl_config.c index d05e1678..66cf5cb9 100644 --- a/code/bngblaster/src/bbl_config.c +++ b/code/bngblaster/src/bbl_config.c @@ -459,6 +459,7 @@ link_add(char *interface_name) link_config = calloc(1, sizeof(bbl_link_config_s)); link_config->interface = strdup(interface_name); link_config->io_mode = g_ctx->config.io_mode; + link_config->io_burst = g_ctx->config.io_burst; link_config->io_slots_rx = g_ctx->config.io_slots; link_config->io_slots_tx = g_ctx->config.io_slots; link_config->qdisc_bypass = g_ctx->config.qdisc_bypass; @@ -479,11 +480,12 @@ json_parse_link(json_t *link, bbl_link_config_s *link_config) const char *schema[] = { "interface", "description", "mac", - "io-mode", "io-slots", "io-slots-tx", - "io-slots-rx", "qdisc-bypass", "tx-interval", - "rx-interval", "tx-threads", "rx-threads", - "rx-cpuset", "tx-cpuset", "lag-interface", - "lacp-priority" + "io-mode", "io-slots" "io-burst", + "io-slots-tx", "io-slots-rx", "qdisc-bypass", + "tx-interval","rx-interval", + "tx-threads", "rx-threads", + "rx-cpuset", "tx-cpuset", + "lag-interface", "lacp-priority" }; if(!schema_validate(link, "links", schema, sizeof(schema)/sizeof(schema[0]))) { @@ -553,7 +555,12 @@ json_parse_link(json_t *link, bbl_link_config_s *link_config) if(value) { link_config->io_slots_rx = json_number_value(value); } - + JSON_OBJ_GET_NUMBER(link, value, "links", "io-burst", 1, 65535); + if(value) { + link_config->io_burst = json_number_value(value); + } else { + link_config->io_burst = g_ctx->config.io_burst; + } JSON_OBJ_GET_BOOL(link, value, "links", "qdisc-bypass"); if(value) { link_config->qdisc_bypass = json_boolean_value(value); @@ -2538,31 +2545,38 @@ json_parse_config_streams(json_t *root) bbl_stream_config_s *stream_config = g_ctx->config.stream_config; if(json_typeof(root) != JSON_OBJECT) { - fprintf(stderr, "JSON config error: Configuration root element must object\n"); + fprintf(stderr, "JSON config error: Configuration root element must be of type object\n"); return false; } section = json_object_get(root, "streams"); - if(json_is_array(section)) { - /* Get tail end of stream-config list. */ - if(stream_config) { - while(stream_config->next) { - stream_config = stream_config->next; - } + if(section) { + if(!json_is_array(section)) { + fprintf(stderr, "JSON config error: Configuration streams element must contain list of objects\n"); + return false; } - /* Config is provided as array (multiple streams) */ - size = json_array_size(section); - for(i = 0; i < size; i++) { - if(!stream_config) { - g_ctx->config.stream_config = calloc(1, sizeof(bbl_stream_config_s)); - stream_config = g_ctx->config.stream_config; - } else { - stream_config->next = calloc(1, sizeof(bbl_stream_config_s)); - stream_config = stream_config->next; - } - if(!json_parse_stream(json_array_get(section, i), stream_config)) { - return false; - } + } else { + return true; + } + + /* Get tail end of stream-config list. */ + if(stream_config) { + while(stream_config->next) { + stream_config = stream_config->next; + } + } + /* Config is provided as array (multiple streams) */ + size = json_array_size(section); + for(i = 0; i < size; i++) { + if(!stream_config) { + g_ctx->config.stream_config = calloc(1, sizeof(bbl_stream_config_s)); + stream_config = g_ctx->config.stream_config; + } else { + stream_config->next = calloc(1, sizeof(bbl_stream_config_s)); + stream_config = stream_config->next; + } + if(!json_parse_stream(json_array_get(section, i), stream_config)) { + return false; } } return true; @@ -3571,7 +3585,7 @@ json_parse_config(json_t *root) if(json_is_object(section)) { const char *schema[] = { - "io-mode", "io-slots", "qdisc-bypass", + "io-mode", "io-slots", "io-burst", "qdisc-bypass", "tx-interval", "rx-interval", "tx-threads", "rx-threads", "capture-include-streams", "mac-modifier", "lag", "network", "access", "a10nsp", "links" @@ -3608,6 +3622,11 @@ json_parse_config(json_t *root) if(value) { g_ctx->config.io_slots = json_number_value(value); } + value = json_object_get(section, "io-burst"); + JSON_OBJ_GET_NUMBER(section, value, "interfaces", "io-burst", 1, 65535); + if(value) { + g_ctx->config.io_burst = json_number_value(value); + } JSON_OBJ_GET_BOOL(section, value, "interfaces", "qdisc-bypass"); if(value) { g_ctx->config.qdisc_bypass = json_boolean_value(value); @@ -4041,9 +4060,10 @@ bbl_config_init_defaults() g_ctx->pcap.include_streams = false; g_ctx->config.username = g_default_user; g_ctx->config.password = g_default_pass; - g_ctx->config.tx_interval = 1 * MSEC; - g_ctx->config.rx_interval = 1 * MSEC; + g_ctx->config.tx_interval = 0.1 * MSEC; + g_ctx->config.rx_interval = 0.1 * MSEC; g_ctx->config.io_slots = 4096; + g_ctx->config.io_burst = 256; g_ctx->config.io_max_stream_len = 9000; g_ctx->config.qdisc_bypass = true; g_ctx->config.sessions = 1; @@ -4099,7 +4119,7 @@ bbl_config_init_defaults() g_ctx->config.multicast_traffic_pps = 1000; g_ctx->config.traffic_autostart = true; g_ctx->config.stream_rate_calc = true; - g_ctx->config.stream_max_burst = 32; + g_ctx->config.stream_max_burst = 16; g_ctx->config.multicast_traffic_autostart = true; g_ctx->config.session_traffic_autostart = true; } \ No newline at end of file diff --git a/code/bngblaster/src/bbl_config.h b/code/bngblaster/src/bbl_config.h index e71e146b..63b4c99d 100644 --- a/code/bngblaster/src/bbl_config.h +++ b/code/bngblaster/src/bbl_config.h @@ -149,6 +149,7 @@ typedef struct bbl_link_config_ uint16_t io_slots_tx; uint16_t io_slots_rx; + uint16_t io_burst; bool qdisc_bypass; diff --git a/code/bngblaster/src/bbl_ctx.c b/code/bngblaster/src/bbl_ctx.c index e8bc8bf5..02860b8d 100644 --- a/code/bngblaster/src/bbl_ctx.c +++ b/code/bngblaster/src/bbl_ctx.c @@ -80,7 +80,6 @@ bbl_ctx_add() g_ctx->vlan_session_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key64, bbl_key64_hash, BBL_SESSION_HASHTABLE_SIZE); g_ctx->l2tp_session_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key32, bbl_key32_hash, BBL_SESSION_HASHTABLE_SIZE); g_ctx->li_flow_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key32, bbl_key32_hash, BBL_LI_HASHTABLE_SIZE); - g_ctx->stream_flow_dict = hashtable_dict_new((dict_compare_func)bbl_compare_key64, bbl_key64_hash, BBL_STREAM_FLOW_HASHTABLE_SIZE); return true; } @@ -119,13 +118,14 @@ bbl_ctx_del() { bbl_session_free(p); } } - free(g_ctx->session_list); - + + if(g_ctx->session_list) free(g_ctx->session_list); + if(g_ctx->stream_index) free(g_ctx->stream_index); + /* Free hash table dictionaries. */ dict_free(g_ctx->vlan_session_dict, NULL); dict_free(g_ctx->l2tp_session_dict, NULL); dict_free(g_ctx->li_flow_dict, NULL); - dict_free(g_ctx->stream_flow_dict, NULL); pcapng_free(); free(g_ctx); diff --git a/code/bngblaster/src/bbl_ctx.h b/code/bngblaster/src/bbl_ctx.h index 0dd7ca4f..785c2d70 100644 --- a/code/bngblaster/src/bbl_ctx.h +++ b/code/bngblaster/src/bbl_ctx.h @@ -78,7 +78,11 @@ typedef struct bbl_ctx_ dict *vlan_session_dict; /* hashtable for 1:1 vlan sessions */ dict *l2tp_session_dict; /* hashtable for L2TP sessions */ dict *li_flow_dict; /* hashtable for LI flows */ - dict *stream_flow_dict; /* hashtable for traffic stream flows */ + + bbl_stream_s **stream_index; + bbl_stream_s *stream_head; + bbl_stream_s *stream_tail; + uint64_t streams; bbl_stream_group_s *stream_groups; @@ -89,6 +93,7 @@ typedef struct bbl_ctx_ char *ctrl_socket_path; bbl_ctrl_thread_s *ctrl_thread; io_thread_s *io_threads; /* single linked list of threads */ + io_bucket_s *io_bucket; bool tcp; bool dpdk; @@ -146,6 +151,7 @@ typedef struct bbl_ctx_ io_mode_t io_mode; uint16_t io_slots; + uint16_t io_burst; uint16_t io_max_stream_len; bool qdisc_bypass; diff --git a/code/bngblaster/src/bbl_def.h b/code/bngblaster/src/bbl_def.h index cbc9d105..67e5e664 100644 --- a/code/bngblaster/src/bbl_def.h +++ b/code/bngblaster/src/bbl_def.h @@ -53,7 +53,6 @@ #define BBL_SESSION_HASHTABLE_SIZE 128993 /* is a prime number */ #define BBL_LI_HASHTABLE_SIZE 32771 /* is a prime number */ -#define BBL_STREAM_FLOW_HASHTABLE_SIZE 128993 /* is a prime number */ /* Mock Addresses */ #define MOCK_IP_LOCAL 167772170 /* 10.0.0.10 */ diff --git a/code/bngblaster/src/bbl_lag.h b/code/bngblaster/src/bbl_lag.h index 0f066c90..8109c4b9 100644 --- a/code/bngblaster/src/bbl_lag.h +++ b/code/bngblaster/src/bbl_lag.h @@ -20,7 +20,7 @@ typedef struct bbl_lag_ uint8_t active_count; bbl_lag_member_s *active_list[LAG_MEMBER_ACTIVE_MAX]; - uint64_t select; + uint32_t select; CIRCLEQ_ENTRY(bbl_lag_) lag_qnode; CIRCLEQ_HEAD(lag_member_, bbl_lag_member_ ) lag_member_qhead; /* list of member interfaces */ diff --git a/code/bngblaster/src/bbl_protocols.h b/code/bngblaster/src/bbl_protocols.h index 73c4b2ac..e2b42436 100644 --- a/code/bngblaster/src/bbl_protocols.h +++ b/code/bngblaster/src/bbl_protocols.h @@ -309,6 +309,7 @@ typedef enum protocol_error_ { IGNORED, EMPTY, FULL, + WAIT, } protocol_error_t; typedef enum icmpv6_message_ { diff --git a/code/bngblaster/src/bbl_rx.c b/code/bngblaster/src/bbl_rx.c index 495701cf..a8514a56 100644 --- a/code/bngblaster/src/bbl_rx.c +++ b/code/bngblaster/src/bbl_rx.c @@ -14,10 +14,8 @@ bbl_rx_stream_network(bbl_network_interface_s *interface, bbl_ethernet_header_s *eth) { bbl_stream_s *stream; - if(!eth->bbl || memcmp(interface->mac, eth->dst, ETH_ADDR_LEN) != 0) { - return false; - } - stream = bbl_stream_rx(eth, NULL); + if(!eth->bbl) return false; + stream = bbl_stream_rx(eth, interface->mac); if(stream) { if(stream->rx_network_interface == NULL) { stream->rx_network_interface = interface; @@ -32,29 +30,13 @@ bbl_rx_stream_access(bbl_access_interface_s *interface, bbl_ethernet_header_s *eth) { bbl_stream_s *stream; - bbl_session_s *session; - uint32_t session_id = 0; - - if(!(eth->bbl && eth->bbl->type == BBL_TYPE_UNICAST)) { - return false; - } - - session_id |= eth->dst[5]; - session_id |= eth->dst[4] << 8; - session_id |= eth->dst[3] << 16; - - session = bbl_session_get(session_id); - if(session) { - if(session->session_state != BBL_TERMINATED && - session->session_state != BBL_IDLE) { - stream = bbl_stream_rx(eth, session); - if(stream) { - if(stream->rx_access_interface == NULL) { - stream->rx_access_interface = interface; - } - return true; - } + if(!eth->bbl) return false; + stream = bbl_stream_rx(eth, NULL); + if(stream) { + if(stream->rx_access_interface == NULL) { + stream->rx_access_interface = interface; } + return true; } return false; } @@ -64,10 +46,8 @@ bbl_rx_stream_a10nsp(bbl_a10nsp_interface_s *interface, bbl_ethernet_header_s *eth) { bbl_stream_s *stream; - if(!eth->bbl || memcmp(interface->mac, eth->dst, ETH_ADDR_LEN) != 0) { - return false; - } - stream = bbl_stream_rx(eth, NULL); + if(!eth->bbl) return false; + stream = bbl_stream_rx(eth, interface->mac); if(stream) { if(stream->rx_a10nsp_interface == NULL) { stream->rx_a10nsp_interface = interface; @@ -81,19 +61,14 @@ bool bbl_rx_thread(bbl_interface_s *interface, bbl_ethernet_header_s *eth) { - bbl_network_interface_s *network_interface = interface->network; - + bbl_network_interface_s *network_interface; if(interface->state == INTERFACE_DISABLED) { return true; } - - while(network_interface) { - if(network_interface->vlan == eth->vlan_outer) { - return bbl_rx_stream_network(network_interface, eth); - } - network_interface = network_interface->next; - } - if(interface->access) { + network_interface = interface->network_vlan[eth->vlan_outer]; + if(network_interface) { + return bbl_rx_stream_network(network_interface, eth); + } else if(interface->access) { return bbl_rx_stream_access(interface->access, eth); } else if(interface->a10nsp) { return bbl_rx_stream_a10nsp(interface->a10nsp, eth); @@ -130,7 +105,7 @@ bbl_rx_handler(bbl_interface_s *interface, if(!bbl_rx_stream_network(network_interface, eth)) { bbl_network_rx_handler(network_interface, eth); } - } else if(interface->access) { + } else if(interface->access) { if(!bbl_rx_stream_access(interface->access, eth)) { bbl_access_rx_handler(interface->access, eth); } diff --git a/code/bngblaster/src/bbl_session.c b/code/bngblaster/src/bbl_session.c index 151e8dfd..d525948b 100644 --- a/code/bngblaster/src/bbl_session.c +++ b/code/bngblaster/src/bbl_session.c @@ -280,10 +280,6 @@ bbl_session_monkey_job(timer_s *timer) { /** * bbl_session_get * - * This function allows to change the state of a session including - * the required action caused by state changes. - * - * @param ctx global context * @param session_id session-id * @return session or NULL if session not found */ @@ -1749,25 +1745,18 @@ bbl_session_ctrl_traffic_stop(int fd, uint32_t session_id, json_t *arguments __a int bbl_session_ctrl_traffic_reset(int fd, uint32_t session_id __attribute__((unused)), json_t *arguments __attribute__((unused))) { - bbl_stream_s *stream; - struct dict_itor *itor; + bbl_stream_s *stream = g_ctx->stream_head; g_ctx->stats.session_traffic_flows_verified = 0; /* Iterate over all traffic streams */ - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(!stream) { - continue; - } + while(stream) { if(stream->session && stream->session_traffic) { stream->session->session_traffic.flows_verified = 0; bbl_stream_reset(stream); } + stream = stream->next; } - dict_itor_free(itor); return bbl_ctrl_status(fd, "ok", 200, NULL); } diff --git a/code/bngblaster/src/bbl_stats.c b/code/bngblaster/src/bbl_stats.c index c1faba41..58085c3a 100644 --- a/code/bngblaster/src/bbl_stats.c +++ b/code/bngblaster/src/bbl_stats.c @@ -165,8 +165,6 @@ bbl_stats_generate(bbl_stats_s * stats) bbl_stats_update_cps(); bbl_stats_generate_multicast(stats, false); - struct dict_itor *itor; - /* Iterate over all sessions */ for(i = 0; i < g_ctx->sessions; i++) { session = &g_ctx->session_list[i]; @@ -370,36 +368,32 @@ bbl_stats_generate(bbl_stats_s * stats) } /* Iterate over all traffic streams */ - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(stream) { - if(stats->min_stream_loss) { - if(stream->rx_loss < stats->min_stream_loss) stats->min_stream_loss = stream->rx_loss; + stream = g_ctx->stream_head; + while(stream) { + if(stats->min_stream_loss) { + if(stream->rx_loss < stats->min_stream_loss) stats->min_stream_loss = stream->rx_loss; + } else { + stats->min_stream_loss = stream->rx_loss; + } + if(stream->rx_loss > stats->max_stream_loss) stats->max_stream_loss = stream->rx_loss; + + if(stream->rx_first_seq) { + if(stats->min_stream_rx_first_seq) { + if(stream->rx_first_seq < stats->min_stream_rx_first_seq) stats->min_stream_rx_first_seq = stream->rx_first_seq; } else { - stats->min_stream_loss = stream->rx_loss; + stats->min_stream_rx_first_seq = stream->rx_first_seq; } - if(stream->rx_loss > stats->max_stream_loss) stats->max_stream_loss = stream->rx_loss; + if(stream->rx_first_seq > stats->max_stream_rx_first_seq) stats->max_stream_rx_first_seq = stream->rx_first_seq; - if(stream->rx_first_seq) { - if(stats->min_stream_rx_first_seq) { - if(stream->rx_first_seq < stats->min_stream_rx_first_seq) stats->min_stream_rx_first_seq = stream->rx_first_seq; - } else { - stats->min_stream_rx_first_seq = stream->rx_first_seq; - } - if(stream->rx_first_seq > stats->max_stream_rx_first_seq) stats->max_stream_rx_first_seq = stream->rx_first_seq; - - if(stats->min_stream_delay_us) { - if(stream->rx_min_delay_us < stats->min_stream_delay_us) stats->min_stream_delay_us = stream->rx_min_delay_us; - } else { - stats->min_stream_delay_us = stream->rx_min_delay_us; - } - if(stream->rx_max_delay_us > stats->max_stream_delay_us) stats->max_stream_delay_us = stream->rx_max_delay_us; + if(stats->min_stream_delay_us) { + if(stream->rx_min_delay_us < stats->min_stream_delay_us) stats->min_stream_delay_us = stream->rx_min_delay_us; + } else { + stats->min_stream_delay_us = stream->rx_min_delay_us; } + if(stream->rx_max_delay_us > stats->max_stream_delay_us) stats->max_stream_delay_us = stream->rx_max_delay_us; } + stream = stream->next; } - dict_itor_free(itor); } void @@ -805,8 +799,6 @@ bbl_stats_json(bbl_stats_s * stats) bbl_session_s *session; bbl_stream_s *stream; - struct dict_itor *itor; - json_t *root = NULL; json_t *jobj = NULL; json_t *jobj_array = NULL; @@ -1204,18 +1196,14 @@ bbl_stats_json(bbl_stats_s * stats) if(g_ctx->config.json_report_streams) { jobj_array = json_array(); - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(stream) { - jobj_sub = bbl_stream_json(stream); - if(jobj_sub) { - json_array_append(jobj_array, jobj_sub); - } + stream = g_ctx->stream_head; + while(stream) { + jobj_sub = bbl_stream_json(stream); + if(jobj_sub) { + json_array_append(jobj_array, jobj_sub); } + stream = stream->next; } - dict_itor_free(itor); json_object_set(jobj, "streams", jobj_array); } diff --git a/code/bngblaster/src/bbl_stream.c b/code/bngblaster/src/bbl_stream.c index ae1ed004..1facd2c4 100644 --- a/code/bngblaster/src/bbl_stream.c +++ b/code/bngblaster/src/bbl_stream.c @@ -21,6 +21,43 @@ const char g_session_traffic_ipv6[] = "session-ipv6"; const char g_session_traffic_ipv6pd[] = "session-ipv6pd"; endpoint_state_t g_endpoint = ENDPOINT_ACTIVE; +/** + * bbl_stream_fast_index_get + * + * @param flow_id flow-id + * @return stream or NULL if stream not found + */ +bbl_stream_s * +bbl_stream_index_get(uint64_t flow_id) +{ + if(g_ctx->stream_index && flow_id <= g_ctx->streams && flow_id > 0) { + return g_ctx->stream_index[flow_id-1]; + } + return NULL; +} + +/** + * bbl_stream_index_init + */ +bool +bbl_stream_index_init() +{ + uint64_t flow_id; + bbl_stream_s *stream = g_ctx->stream_head; + + g_ctx->stream_index = calloc(g_ctx->streams, sizeof(bbl_stream_s*)); + + while(stream) { + flow_id = stream->flow_id; + if(flow_id > g_ctx->streams || flow_id < 1) { + return false; + } + g_ctx->stream_index[flow_id-1] = stream; + stream = stream->next; + } + return true; +} + static void bbl_stream_delay(bbl_stream_s *stream, struct timespec *rx_timestamp, struct timespec *bbl_timestamp) { @@ -1253,18 +1290,11 @@ bbl_stream_ctrl(bbl_stream_s *stream) void bbl_stream_final() { - struct dict_itor *itor; - bbl_stream_s *stream; - - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(stream) { - bbl_stream_ctrl(stream); - } + bbl_stream_s *stream = g_ctx->stream_head; + while(stream) { + bbl_stream_ctrl(stream); + stream = stream->next; } - dict_itor_free(itor); } static bool @@ -1336,21 +1366,29 @@ bbl_stream_can_send(bbl_stream_s *stream) return false; } -void -bbl_stream_tx_qnode_insert(io_handle_s *io, bbl_stream_s *stream) +static bool +bbl_stream_session_can_send(bbl_stream_s *stream) { - if(CIRCLEQ_NEXT(stream, tx_qnode)) { - return; - } - CIRCLEQ_INSERT_TAIL(&io->stream_tx_qhead, stream, tx_qnode); -} + bbl_session_s *session = stream->session; -void -bbl_stream_tx_qnode_remove(io_handle_s *io, bbl_stream_s *stream) -{ - CIRCLEQ_REMOVE(&io->stream_tx_qhead, stream, tx_qnode); - CIRCLEQ_NEXT(stream, tx_qnode) = NULL; - CIRCLEQ_PREV(stream, tx_qnode) = NULL; + if(session) { + if(stream->session_traffic) { + if(!session->session_traffic.active) { + return false; + } + } else if(!session->streams.active) { + return false; + } + if(stream->session_version != session->version) { + if(stream->tx_buf) { + free(stream->tx_buf); + stream->tx_buf = NULL; + stream->tx_len = 0; + } + stream->session_version = session->version; + } + } + return true; } static void @@ -1368,48 +1406,11 @@ bbl_stream_update_tcp(bbl_stream_s *stream) } } -protocol_error_t -bbl_stream_tx(io_handle_s *io, uint8_t *buf, uint16_t *len) -{ - bbl_stream_s *stream; - if(!CIRCLEQ_EMPTY(&io->stream_tx_qhead)) { - stream = CIRCLEQ_FIRST(&io->stream_tx_qhead); - if(stream->token_bucket && stream->tx_buf) { - /* Update BBL header fields */ - *(uint64_t*)(stream->tx_buf + (stream->tx_len - 16)) = stream->flow_seq; - *(uint32_t*)(stream->tx_buf + (stream->tx_len - 8)) = io->timestamp.tv_sec; - *(uint32_t*)(stream->tx_buf + (stream->tx_len - 4)) = io->timestamp.tv_nsec; - if(stream->tcp) { - bbl_stream_update_tcp(stream); - } - *len = stream->tx_len; - memcpy(buf, stream->tx_buf, *len); - stream->token_bucket--; - stream->tx_packets++; - - if(stream->flow_seq == 1) { - stream->tx_first_epoch = io->timestamp.tv_sec; - } - stream->flow_seq++; - - /* Remove only from TX queue if all tokens are consumed! */ - bbl_stream_tx_qnode_remove(io, stream); - if(stream->token_bucket) { - /* Move to the end. */ - bbl_stream_tx_qnode_insert(io, stream); - } - } else { - bbl_stream_tx_qnode_remove(io, stream); - } - return PROTOCOL_SUCCESS; - } - return EMPTY; -} - static bool bbl_stream_lag(bbl_stream_s *stream) { bbl_lag_s *lag = stream->tx_interface->lag; + bbl_stream_s *s; io_handle_s *io; uint8_t key; @@ -1422,132 +1423,143 @@ bbl_stream_lag(bbl_stream_s *stream) key = stream->flow_id % lag->active_count; io = lag->active_list[key]->interface->io.tx; if(stream->io != io) { - if(CIRCLEQ_NEXT(stream, tx_qnode)) { - bbl_stream_tx_qnode_remove(stream->io, stream); + s = stream->io->stream_head; + if(s == stream) { + stream->io->stream_head = s->io_next; + } else { + while(s) { + if(s->io_next == stream) { + s->io_next = stream->io_next; + s = NULL; + } + } + } + if(stream->io->stream_pps > stream->config->pps) { + stream->io->stream_pps -= stream->config->pps; } + io->stream_pps += stream->config->pps; stream->io = io; + stream->io_next = io->stream_head; + io->stream_head = stream; + io->stream_cur = stream; + + stream->io->stream_cur = stream->io->stream_head; } } return true; } -void -bbl_stream_token_job(timer_s *timer) +static void +bbl_stream_setup(bbl_stream_s *stream) +{ + uint64_t setup_tokens; + if(stream->tx_packets == 0) { + setup_tokens = (rand() % stream->config->setup_interval) * stream->config->pps; + } else { + setup_tokens = stream->config->setup_interval * stream->config->pps; + } + if(setup_tokens) setup_tokens--; + stream->tokens += setup_tokens * IO_TOKENS_PER_PACKET; +} + +protocol_error_t +bbl_stream_io_send(io_handle_s *io, bbl_stream_s *stream) { - bbl_stream_s *stream = timer->data; - bbl_session_s *session = stream->session; struct timespec time_elapsed; + bbl_stream_config_s *config; - uint64_t packets_expected; - uint64_t packets_send; + if(stream->active && stream->io_bucket->tokens <= stream->tokens) { + return WAIT; + } - if(!bbl_stream_can_send(stream)) { - stream->token_bucket = 0; - stream->send_window_active = false; - return; + /** Enforce optional stream packet limit ... */ + config = stream->config; + if(config->max_packets && + stream->tx_packets >= config->max_packets) { + return FULL; } - if(session) { - if(stream->session_traffic) { - if(!session->session_traffic.active) { - stream->token_bucket = 0; - stream->send_window_active = false; - return; - } - } else if(!session->streams.active) { - stream->token_bucket = 0; - stream->send_window_active = false; - return; - } - if(stream->session_version != session->version) { - if(stream->tx_buf) { - free(stream->tx_buf); - stream->tx_buf = NULL; - stream->tx_len = 0; - } - stream->session_version = session->version; - } + if(!(bbl_stream_can_send(stream) && + bbl_stream_session_can_send(stream))) { + stream->active = false; + return WRONG_PROTOCOL_STATE; } if(stream->lag) { if(!bbl_stream_lag(stream)) { - stream->token_bucket = 0; - stream->send_window_active = false; - return; + stream->active = false; + return WRONG_PROTOCOL_STATE; } } /** Enforce optional stream traffic start delay ... */ - if(stream->config->start_delay && stream->tx_packets == 0) { + if(stream->tx_packets == 0 && config->start_delay) { if(stream->wait) { - timespec_sub(&time_elapsed, timer->timestamp, &stream->wait_start); + timespec_sub(&time_elapsed, &io->timestamp, &stream->wait_start); if(time_elapsed.tv_sec <= stream->config->start_delay) { /** Wait ... */ - return; + return WAIT; } } else { /** Start wait window ... */ stream->wait = true; - stream->wait_start.tv_sec = timer->timestamp->tv_sec; - stream->wait_start.tv_nsec = timer->timestamp->tv_nsec; - return; + stream->wait_start.tv_sec = io->timestamp.tv_sec; + stream->wait_start.tv_nsec = io->timestamp.tv_nsec; + return WAIT; } } - if(stream->setup) { - if(!stream->send_window_start.tv_sec) { - stream->send_window_start.tv_sec = timer->timestamp->tv_sec - (rand() % stream->config->setup_interval); - } - timespec_sub(&time_elapsed, timer->timestamp, &stream->send_window_start); - if(time_elapsed.tv_sec < stream->config->setup_interval) { - stream->token_bucket = 0; - return; - } else { - stream->token_bucket = 1; - stream->send_window_start.tv_sec = timer->timestamp->tv_sec; - } - } else if(stream->send_window_active) { - /** Update send window */ - timespec_sub(&time_elapsed, timer->timestamp, &stream->send_window_start); - packets_expected = time_elapsed.tv_sec * stream->config->pps; - packets_expected += stream->config->pps * ((double)time_elapsed.tv_nsec / 1000000000.0); - packets_send = stream->tx_packets - stream->send_window_start_packets; - if(packets_expected > packets_send) { - stream->token_bucket = packets_expected - packets_send; - if(stream->token_bucket > stream->token_burst) { - stream->token_bucket = stream->token_burst; - } - } else { - stream->token_bucket = 1; - } - } else { - /* Open new send window */ - stream->send_window_active = true; - stream->send_window_start_packets = stream->tx_packets; - stream->send_window_start.tv_sec = timer->timestamp->tv_sec; - stream->send_window_start.tv_nsec = timer->timestamp->tv_nsec; - stream->token_bucket = 1; + if(!stream->active) { + stream->active = true; + stream->tokens = stream->io_bucket->tokens + (rand() % IO_TOKENS_PER_PACKET); } - /** Enforce optional stream packet limit ... */ - if(stream->config->max_packets && - stream->tx_packets + stream->token_bucket > stream->config->max_packets) { - if(stream->tx_packets < stream->config->max_packets) { - stream->token_bucket = stream->config->max_packets - stream->tx_packets; - } else { - stream->token_bucket = 0; - return; - } + if(stream->setup) { + bbl_stream_setup(stream); } if(!stream->tx_buf) { if(!bbl_stream_build_packet(stream)) { LOG(ERROR, "Failed to build packet for stream %s\n", stream->config->name); - stream->token_bucket = 0; - return; + return ENCODE_ERROR; } } - bbl_stream_tx_qnode_insert(stream->io, stream); + + /* Update BBL header fields */ + *(uint64_t*)(stream->tx_buf + (stream->tx_len - 16)) = stream->flow_seq; + *(uint32_t*)(stream->tx_buf + (stream->tx_len - 8)) = io->timestamp.tv_sec; + *(uint32_t*)(stream->tx_buf + (stream->tx_len - 4)) = io->timestamp.tv_nsec; + if(stream->tcp) { + bbl_stream_update_tcp(stream); + } + if(stream->flow_seq == 1) { + stream->tx_first_epoch = io->timestamp.tv_sec; + } + stream->tokens += IO_TOKENS_PER_PACKET; + return PROTOCOL_SUCCESS; +} + +bbl_stream_s* +bbl_stream_io_send_iter(io_handle_s *io) +{ + bbl_stream_s *stream = io->stream_cur; + bbl_stream_s *start = stream; + + while(stream) { + if(bbl_stream_io_send(io, stream) == PROTOCOL_SUCCESS) { + return stream; + } + if(stream->io_next) { + stream = stream->io_next; + } else { + stream = io->stream_head; + } + io->stream_cur = stream; + if(stream == start) { + break; + } + } + return NULL; } void @@ -1600,6 +1612,9 @@ bbl_stream_select_io_lag(bbl_stream_s *stream) CIRCLEQ_FOREACH(member, &lag->lag_member_qhead, lag_member_qnode) { if(!stream->io) { stream->io = member->interface->io.tx; + stream->io_next = stream->io->stream_head; + stream->io->stream_head = stream; + stream->io->stream_cur = stream; } member->interface->io.tx->stream_pps += stream->config->pps; } @@ -1619,6 +1634,9 @@ bbl_stream_select_io(bbl_stream_s *stream) } io->stream_pps += stream->config->pps; stream->io = io; + stream->io_next = io->stream_head; + io->stream_head = stream; + io->stream_cur = stream; if(io->thread) { stream->threaded = true; } @@ -1627,33 +1645,23 @@ bbl_stream_select_io(bbl_stream_s *stream) static void bbl_stream_add(bbl_stream_s *stream) { - time_t timer_sec; - long timer_nsec; - bbl_stream_add_group(stream); if(stream->tx_interface->type == LAG_INTERFACE) { bbl_stream_select_io_lag(stream); } else { bbl_stream_select_io(stream); } - - stream->token_burst = g_ctx->config.stream_max_burst; - - /* Calculate timer. */ - timer_sec = stream->tx_interval / 1000000000; - timer_nsec = stream->tx_interval % 1000000000; - if(stream->io && stream->io->thread) { - timer_add_periodic(&stream->io->thread->timer.root, &stream->tx_timer, "Stream Tokens", - timer_sec, timer_nsec, stream, &bbl_stream_token_job); - } else { - timer_add_periodic(&g_ctx->timer_root, &stream->tx_timer, "Stream Tokens", - timer_sec, timer_nsec, stream, &bbl_stream_token_job); - } - stream->tx_timer->reset = false; - + io_bucket_stream(stream); if(stream->config->setup_interval) { stream->setup = true; } + if(g_ctx->stream_head) { + g_ctx->stream_tail->next = stream; + } else { + g_ctx->stream_head = stream; + } + g_ctx->stream_tail = stream; + g_ctx->streams++; g_ctx->total_pps += stream->config->pps; } @@ -1666,10 +1674,6 @@ bbl_stream_session_add(bbl_stream_config_s *config, bbl_session_s *session) bbl_stream_s *stream_up = NULL; bbl_stream_s *stream_down = NULL; - dict_insert_result result; - - uint64_t tx_interval = 0; - assert(config); assert(session); @@ -1701,7 +1705,6 @@ bbl_stream_session_add(bbl_stream_config_s *config, bbl_session_s *session) return true; } - tx_interval = SEC / config->pps; if(config->direction & BBL_DIRECTION_UP) { if(config->type == BBL_SUB_TYPE_IPV4) { if(!((network_interface && network_interface->ip.address) || @@ -1749,14 +1752,6 @@ bbl_stream_session_add(bbl_stream_config_s *config, bbl_session_s *session) } stream_up->access_interface = access_interface; stream_up->tx_interface = access_interface->interface; - stream_up->tx_interval = tx_interval; - result = dict_insert(g_ctx->stream_flow_dict, &stream_up->flow_id); - if(!result.inserted) { - LOG(ERROR, "Failed to insert stream %s (upstream)\n", config->name); - free(stream_up); - return false; - } - *result.datum_ptr = stream_up; stream_up->session_next = session->streams.head; session->streams.head = stream_up; bbl_stream_add(stream_up); @@ -1798,15 +1793,7 @@ bbl_stream_session_add(bbl_stream_config_s *config, bbl_session_s *session) if(stream_down->config->raw_tcp) { stream_down->tcp = true; } - stream_down->tx_interval = tx_interval; stream_down->session_traffic = config->session_traffic; - result = dict_insert(g_ctx->stream_flow_dict, &stream_down->flow_id); - if(!result.inserted) { - LOG(ERROR, "Failed to insert stream %s (downstream)\n", config->name); - free(stream_down); - return false; - } - *result.datum_ptr = stream_down; stream_down->session_next = session->streams.head; session->streams.head = stream_down; if(network_interface) { @@ -1920,10 +1907,6 @@ bbl_stream_init() { bbl_stream_s *stream; bbl_network_interface_s *network_interface; - - dict_insert_result result; - - uint64_t tx_interval = 0; int i; uint32_t group; @@ -1939,7 +1922,6 @@ bbl_stream_init() { return false; } - tx_interval = SEC / config->pps; if(config->direction & BBL_DIRECTION_DOWN) { stream = calloc(1, sizeof(bbl_stream_s)); stream->endpoint = &g_endpoint; @@ -1958,19 +1940,11 @@ bbl_stream_init() { stream->direction = BBL_DIRECTION_DOWN; stream->network_interface = network_interface; stream->tx_interface = network_interface->interface; - stream->tx_interval = tx_interval; if(network_interface->ldp_adjacency && (config->ipv4_ldp_lookup_address || *(uint64_t*)stream->config->ipv6_ldp_lookup_address)) { stream->ldp_lookup = true; } - result = dict_insert(g_ctx->stream_flow_dict, &stream->flow_id); - if(!result.inserted) { - LOG(ERROR, "Failed to insert RAW stream %s\n", config->name); - free(stream); - return false; - } - *result.datum_ptr = stream; bbl_stream_add(stream); if(stream->type == BBL_TYPE_MULTICAST) { LOG(DEBUG, "RAW multicast traffic stream %s added to %s with %0.2lf PPS\n", @@ -1993,7 +1967,6 @@ bbl_stream_init() { return false; } - tx_interval = SEC / g_ctx->config.multicast_traffic_pps; for(i = 0; i < g_ctx->config.igmp_group_count; i++) { group = be32toh(g_ctx->config.igmp_group) + i * be32toh(g_ctx->config.igmp_group_iter); @@ -2029,14 +2002,6 @@ bbl_stream_init() { stream->direction = BBL_DIRECTION_DOWN; stream->network_interface = network_interface; stream->tx_interface = network_interface->interface; - stream->tx_interval = tx_interval; - result = dict_insert(g_ctx->stream_flow_dict, &stream->flow_id); - if(!result.inserted) { - LOG(ERROR, "Failed to insert multicast stream %s\n", config->name); - free(stream); - return false; - } - *result.datum_ptr = stream; bbl_stream_add(stream); LOG(DEBUG, "Autogenerated multicast traffic stream added to %s with %0.2lf PPS\n", network_interface->name, config->pps); @@ -2211,12 +2176,12 @@ bbl_stream_rx_nat(bbl_ethernet_header_s *eth, bbl_stream_s *stream) { } bbl_stream_s * -bbl_stream_rx(bbl_ethernet_header_s *eth, bbl_session_s *session) +bbl_stream_rx(bbl_ethernet_header_s *eth, uint8_t *mac) { bbl_bbl_s *bbl = eth->bbl; bbl_stream_s *stream; + bbl_session_s *session; bbl_mpls_s *mpls; - void **search = NULL; uint64_t loss = 0; @@ -2224,22 +2189,15 @@ bbl_stream_rx(bbl_ethernet_header_s *eth, bbl_session_s *session) return NULL; } - search = dict_search(g_ctx->stream_flow_dict, &bbl->flow_id); - if(search) { - stream = *search; + stream = bbl_stream_index_get(bbl->flow_id); + if(stream) { if(stream->rx_first_seq) { /* Stream already verified */ if((stream->rx_last_seq +1) < bbl->flow_seq) { loss = bbl->flow_seq - (stream->rx_last_seq +1); stream->rx_loss += loss; - if(session) { - LOG(LOSS, "LOSS (ID: %u) Unicast flow: %lu seq: %lu last: %lu\n", - session->session_id, bbl->flow_id, bbl->flow_seq, stream->rx_last_seq); - - } else { - LOG(LOSS, "LOSS Unicast flow: %lu seq: %lu last: %lu\n", - bbl->flow_id, bbl->flow_seq, stream->rx_last_seq); - } + LOG(LOSS, "LOSS Unicast flow: %lu seq: %lu last: %lu\n", + bbl->flow_id, bbl->flow_seq, stream->rx_last_seq); } } else { /* Verify stream ... */ @@ -2279,13 +2237,30 @@ bbl_stream_rx(bbl_ethernet_header_s *eth, bbl_session_s *session) bbl->direction != stream->direction) { return NULL; } - if(session && stream->session_traffic) { - if(bbl->outer_vlan_id != session->vlan_key.outer_vlan_id || - bbl->inner_vlan_id != session->vlan_key.inner_vlan_id || - bbl->session_id != session->session_id) { - stream->rx_wrong_session++; + if(mac) { + if(memcmp(mac, eth->dst, ETH_ADDR_LEN) != 0) { + return NULL; + } + } else { + session = stream->session; + if(!session) { + return NULL; + } + if(session->session_state == BBL_TERMINATED || + session->session_state == BBL_IDLE) { return NULL; } + if(memcmp(session->client_mac, eth->dst, ETH_ADDR_LEN) != 0) { + return NULL; + } + if(stream->session_traffic) { + if(bbl->outer_vlan_id != session->vlan_key.outer_vlan_id || + bbl->inner_vlan_id != session->vlan_key.inner_vlan_id || + bbl->session_id != session->session_id) { + stream->rx_wrong_session++; + return NULL; + } + } } if(stream->nat && stream->direction == BBL_DIRECTION_UP) { bbl_stream_rx_nat(eth, stream); @@ -2308,39 +2283,31 @@ bbl_stream_rx(bbl_ethernet_header_s *eth, bbl_session_s *session) static json_t * bbl_stream_summary_json() { - struct dict_itor *itor; - bbl_stream_s *stream; - + bbl_stream_s *stream = g_ctx->stream_head; json_t *jobj, *jobj_array; - jobj_array = json_array(); - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(stream) { - jobj = json_pack("{si ss* ss ss ss sI sI sI sI sI }", - "flow-id", stream->flow_id, - "name", stream->config->name, - "type", stream_type_string(stream), - "sub-type", stream_sub_type_string(stream), - "direction", stream->direction == BBL_DIRECTION_UP ? "upstream" : "downstream", - "tx-packets", stream->tx_packets - stream->reset_packets_tx, - "tx-bytes", (stream->tx_packets - stream->reset_packets_tx) * stream->tx_len, - "rx-packets", stream->rx_packets - stream->reset_packets_rx, - "rx-bytes", (stream->rx_packets - stream->reset_packets_rx) * stream->rx_len, - "rx-loss", stream->rx_loss - stream->reset_loss); - if(jobj) { - if(stream->session) { - json_object_set(jobj, "session-id", json_integer(stream->session->session_id)); - json_object_set(jobj, "session-traffic", json_boolean(stream->session_traffic)); - } - json_array_append(jobj_array, jobj); + while(stream) { + jobj = json_pack("{si ss* ss ss ss sI sI sI sI sI }", + "flow-id", stream->flow_id, + "name", stream->config->name, + "type", stream_type_string(stream), + "sub-type", stream_sub_type_string(stream), + "direction", stream->direction == BBL_DIRECTION_UP ? "upstream" : "downstream", + "tx-packets", stream->tx_packets - stream->reset_packets_tx, + "tx-bytes", (stream->tx_packets - stream->reset_packets_tx) * stream->tx_len, + "rx-packets", stream->rx_packets - stream->reset_packets_rx, + "rx-bytes", (stream->rx_packets - stream->reset_packets_rx) * stream->rx_len, + "rx-loss", stream->rx_loss - stream->reset_loss); + if(jobj) { + if(stream->session) { + json_object_set(jobj, "session-id", json_integer(stream->session->session_id)); + json_object_set(jobj, "session-traffic", json_boolean(stream->session_traffic)); } + json_array_append(jobj_array, jobj); } + stream = stream->next; } - dict_itor_free(itor); return jobj_array; } @@ -2508,7 +2475,6 @@ bbl_stream_ctrl_info(int fd, uint32_t session_id __attribute__((unused)), json_t json_t *json_stream = NULL; bbl_stream_s *stream; - void **search = NULL; int number = 0; uint64_t flow_id; @@ -2519,9 +2485,8 @@ bbl_stream_ctrl_info(int fd, uint32_t session_id __attribute__((unused)), json_t } flow_id = number; - search = dict_search(g_ctx->stream_flow_dict, &flow_id); - if(search) { - stream = *search; + stream = bbl_stream_index_get(flow_id); + if(stream) { json_stream = bbl_stream_json(stream); root = json_pack("{ss si so*}", "status", "ok", @@ -2668,24 +2633,17 @@ bbl_stream_ctrl_traffic_stop(int fd, uint32_t session_id, json_t *arguments) int bbl_stream_ctrl_reset(int fd, uint32_t session_id __attribute__((unused)), json_t *arguments __attribute__((unused))) { - bbl_stream_s *stream; - struct dict_itor *itor; + bbl_stream_s *stream = g_ctx->stream_head; g_ctx->stats.stream_traffic_flows_verified = 0; /* Iterate over all traffic streams */ - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(!stream) { - continue; - } + while(stream) { if(!stream->session_traffic) { bbl_stream_reset(stream); } + stream = stream->next; } - dict_itor_free(itor); return bbl_ctrl_status(fd, "ok", 200, NULL); } @@ -2693,23 +2651,18 @@ int bbl_stream_ctrl_pending(int fd, uint32_t session_id __attribute__((unused)), json_t *arguments __attribute__((unused))) { int result = 0; - bbl_stream_s *stream; - struct dict_itor *itor; + bbl_stream_s *stream = g_ctx->stream_head; json_t *root, *json_streams; json_streams = json_array(); /* Iterate over all traffic streams */ - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(!stream) { - continue; - } + /* Iterate over all traffic streams */ + while(stream) { if(!stream->verified) { json_array_append(json_streams, json_integer(stream->flow_id)); } + stream = stream->next; } root = json_pack("{ss si so}", @@ -2730,14 +2683,11 @@ static int bbl_stream_ctrl_start_stop(int fd, uint32_t session_id, int session_group_id, uint64_t flow_id, bool status) { bbl_session_s *session; - bbl_stream_s *stream; - struct dict_itor *itor; - void **search = NULL; + bbl_stream_s *stream = g_ctx->stream_head; if(flow_id) { - search = dict_search(g_ctx->stream_flow_dict, &flow_id); - if(search) { - stream = *search; + stream = bbl_stream_index_get(flow_id); + if(stream) { stream->stop = status; } else { return bbl_ctrl_status(fd, "warning", 404, "stream not found"); @@ -2755,21 +2705,13 @@ bbl_stream_ctrl_start_stop(int fd, uint32_t session_id, int session_group_id, ui } } else { /* Iterate over all traffic streams */ - itor = dict_itor_new(g_ctx->stream_flow_dict); - dict_itor_first(itor); - for (; dict_itor_valid(itor); dict_itor_next(itor)) { - stream = (bbl_stream_s*)*dict_itor_datum(itor); - if(!stream) { - continue; - } - if(session_group_id && stream->session && - stream->session->session_group_id != session_group_id) { - continue; + while(stream) { + if(!(session_group_id && stream->session && + stream->session->session_group_id != session_group_id)) { + stream->stop = status; } - stream->stop = status; + stream = stream->next; } - dict_itor_free(itor); - } return bbl_ctrl_status(fd, "ok", 200, NULL); } diff --git a/code/bngblaster/src/bbl_stream.h b/code/bngblaster/src/bbl_stream.h index ef5e4a27..a84937ae 100644 --- a/code/bngblaster/src/bbl_stream.h +++ b/code/bngblaster/src/bbl_stream.h @@ -82,63 +82,61 @@ typedef struct bbl_stream_ uint8_t sub_type; uint8_t direction; + bool threaded; + bool session_traffic; + bool active; + bool setup; + bool verified; + bool wait; + bool stop; + bool reset; + bool nat; + bool tcp; + bool lag; + bool ldp_lookup; + + uint32_t session_version; + uint32_t ldp_entry_version; + + uint32_t ipv4_src; + uint32_t ipv4_dst; + + uint8_t *ipv6_src; + uint8_t *ipv6_dst; + + uint16_t tx_len; /* TX length */ + uint16_t tx_bbl_hdr_len; /* TX BBL HDR length */ + uint8_t *tx_buf; /* TX buffer */ + bbl_stream_config_s *config; - bbl_stream_group_s *group; + bbl_stream_s *next; /* Next stream (global) */ + bbl_stream_s *io_next; /* Next stream of same IO handle */ bbl_stream_s *group_next; /* Next stream of same group */ + bbl_stream_s *session_next; /* Next stream of same session */ bbl_stream_s *reverse; /* Reverse stream direction */ - uint32_t session_version; + bbl_stream_group_s *group; bbl_session_s *session; - bbl_stream_s *session_next; /* Next stream of same session */ endpoint_state_t *endpoint; io_handle_s *io; + io_bucket_s *io_bucket; bbl_access_interface_s *access_interface; bbl_network_interface_s *network_interface; bbl_a10nsp_interface_s *a10nsp_interface; - struct timer_ *tx_timer; bbl_interface_s *tx_interface; /* TX interface */ - uint8_t *tx_buf; /* TX buffer */ - uint16_t tx_len; /* TX length */ - uint16_t tx_bbl_hdr_len; /* TX BBL HDR length */ - uint64_t tx_interval; /* TX interval in nsec */ - __time_t tx_first_epoch; - - uint32_t ipv4_src; - uint32_t ipv4_dst; - uint8_t *ipv6_src; - uint8_t *ipv6_dst; - - bool threaded; - bool session_traffic; - bool setup; - bool verified; - bool wait; - bool stop; - bool reset; - bool nat; - bool tcp; - bool lag; - bool ldp_lookup; - uint32_t ldp_entry_version; ldp_db_entry_s *ldp_entry; - bool send_window_active; - uint64_t send_window_start_packets; - struct timespec send_window_start; - - struct timespec wait_start; - - CIRCLEQ_ENTRY(bbl_stream_) tx_qnode; - uint32_t token_bucket; - uint32_t token_burst; + uint64_t tx_packets; + uint64_t tokens; + uint32_t lag_select; - uint64_t lag_select; + __time_t tx_first_epoch; - uint64_t tx_packets; + struct timespec wait_start; char _pad0 __attribute__((__aligned__(CACHE_LINE_SIZE))); /* empty cache line */ @@ -194,6 +192,12 @@ typedef struct bbl_stream_ } bbl_stream_s; +bbl_stream_s * +bbl_stream_index_get(uint64_t flow_id); + +bool +bbl_stream_index_init(); + bool bbl_stream_session_init(bbl_session_s *session); @@ -204,10 +208,13 @@ void bbl_stream_final(); protocol_error_t -bbl_stream_tx(io_handle_s *io, uint8_t *buf, uint16_t *len); +bbl_stream_io_send(io_handle_s *io, bbl_stream_s *stream); + +bbl_stream_s* +bbl_stream_io_send_iter(io_handle_s *io); bbl_stream_s * -bbl_stream_rx(bbl_ethernet_header_s *eth, bbl_session_s *session); +bbl_stream_rx(bbl_ethernet_header_s *eth, uint8_t *mac); void bbl_stream_reset(bbl_stream_s *stream); diff --git a/code/bngblaster/src/io/io.c b/code/bngblaster/src/io/io.c index 9d64386f..78e118fc 100644 --- a/code/bngblaster/src/io/io.c +++ b/code/bngblaster/src/io/io.c @@ -7,36 +7,4 @@ * SPDX-License-Identifier: BSD-3-Clause */ -#include "io.h" - -void -io_update_stream_token_bucket(io_handle_s *io) -{ - io->stream_tokens += io->stream_rate; - if(io->stream_tokens > io->stream_burst) { - io->stream_tokens = io->stream_burst; - } -} - -void -io_init_stream_token_bucket() -{ - bbl_interface_s *interface; - io_handle_s *io; - double rate; - - CIRCLEQ_FOREACH(interface, &g_ctx->interface_qhead, interface_qnode) { - io = interface->io.tx; - while(io) { - rate = (io->stream_pps / (io->interface->config->tx_interval / 1000)) * 1.3; - io->stream_rate = rate; - if(rate - (double)io->stream_rate) { - /* Roundup. */ - io->stream_rate++; - } - io->stream_tokens = 0; - io->stream_burst = io->stream_rate * 3; - io = io->next; - } - } -} \ No newline at end of file +#include "io.h" \ No newline at end of file diff --git a/code/bngblaster/src/io/io.h b/code/bngblaster/src/io/io.h index 0c0a6d72..4a931905 100644 --- a/code/bngblaster/src/io/io.h +++ b/code/bngblaster/src/io/io.h @@ -19,6 +19,7 @@ #include "../bbl_txq.h" #include "io_def.h" +#include "io_bucket.h" #include "io_socket.h" #include "io_interface.h" #include "io_thread.h" @@ -30,10 +31,4 @@ #include "io_dpdk.h" #endif -void -io_update_stream_token_bucket(io_handle_s *io); - -void -io_init_stream_token_bucket(); - #endif \ No newline at end of file diff --git a/code/bngblaster/src/io/io_bucket.c b/code/bngblaster/src/io/io_bucket.c new file mode 100644 index 00000000..7cd74606 --- /dev/null +++ b/code/bngblaster/src/io/io_bucket.c @@ -0,0 +1,63 @@ +/* + * BNG Blaster (BBL) - IO Bucket + * + * Christian Giese, January 2024 + * + * Copyright (C) 2020-2023, RtBrick, Inc. + * SPDX-License-Identifier: BSD-3-Clause + */ +#include "io.h" + +void +io_tocken_job(timer_s *timer) +{ + io_bucket_s *io_bucket = timer->data; + struct timespec time_elapsed; + uint64_t tokens; + if(!io_bucket->timestamp_start.tv_sec) { + io_bucket->timestamp_start.tv_sec = timer->timestamp->tv_sec; + io_bucket->timestamp_start.tv_nsec = timer->timestamp->tv_nsec; + } + timespec_sub(&time_elapsed, timer->timestamp, &io_bucket->timestamp_start); + tokens = ((io_bucket->pps * time_elapsed.tv_sec) * IO_TOKENS_PER_PACKET) + + ((io_bucket->pps * time_elapsed.tv_nsec) / 1000000.0); + io_bucket->tokens = tokens; +} + +static io_bucket_s * +io_bucket_add(double pps) +{ + time_t timer_sec; + long timer_nsec; + uint64_t tx_interval = 0; + + io_bucket_s *io_bucket = calloc(1, sizeof(io_bucket_s)); + io_bucket->next = g_ctx->io_bucket; + g_ctx->io_bucket = io_bucket; + io_bucket->pps = pps; + + tx_interval = SEC / (pps*IO_TOKENS_PER_PACKET); + timer_sec = tx_interval / SEC; + timer_nsec = tx_interval % SEC; + timer_add_periodic(&g_ctx->timer_root, &io_bucket->timer, + "TB", timer_sec, timer_nsec, + io_bucket, &io_tocken_job); + + return io_bucket; +} + +void +io_bucket_stream(bbl_stream_s *stream) +{ + io_bucket_s *io_bucket = g_ctx->io_bucket; + while(io_bucket) { + if(io_bucket->pps == stream->config->pps) { + break; + } + io_bucket = io_bucket->next; + } + if(!io_bucket) { + io_bucket = io_bucket_add(stream->config->pps); + } + stream->io_bucket = io_bucket; +} \ No newline at end of file diff --git a/code/bngblaster/src/io/io_bucket.h b/code/bngblaster/src/io/io_bucket.h new file mode 100644 index 00000000..8089f948 --- /dev/null +++ b/code/bngblaster/src/io/io_bucket.h @@ -0,0 +1,15 @@ +/* + * BNG Blaster (BBL) - IO Bucket + * + * Christian Giese, January 2024 + * + * Copyright (C) 2020-2023, RtBrick, Inc. + * SPDX-License-Identifier: BSD-3-Clause + */ +#ifndef __BBL_IO_BUCKET_H__ +#define __BBL_IO_BUCKET_H__ + +void +io_bucket_stream(bbl_stream_s *stream); + +#endif \ No newline at end of file diff --git a/code/bngblaster/src/io/io_def.h b/code/bngblaster/src/io/io_def.h index 59cac81b..40d21a35 100644 --- a/code/bngblaster/src/io/io_def.h +++ b/code/bngblaster/src/io/io_def.h @@ -9,6 +9,8 @@ #ifndef __BBL_IO_DEF_H__ #define __BBL_IO_DEF_H__ +#define IO_TOKENS_PER_PACKET 1000 + typedef struct io_handle_ io_handle_s; typedef struct io_thread_ io_thread_s; @@ -38,6 +40,15 @@ typedef enum { IO_MODE_AF_XDP /* AF_XDP */ } __attribute__ ((__packed__)) io_mode_t; +typedef struct io_bucket_ { + double pps; + uint64_t tokens; + uint64_t tokens_last; + struct timer_ *timer; + struct timespec timestamp_start; + struct io_bucket_ *next; +} io_bucket_s; + typedef struct io_handle_ { io_mode_t mode; io_direction_t direction; @@ -70,10 +81,8 @@ typedef struct io_handle_ { uint16_t vlan_tpid; double stream_pps; - uint32_t stream_tokens; - uint32_t stream_rate; - uint32_t stream_burst; - CIRCLEQ_HEAD(stream_tx_, bbl_stream_) stream_tx_qhead; + bbl_stream_s *stream_head; + bbl_stream_s *stream_cur; struct timespec timestamp; /* user space timestamps */ @@ -93,7 +102,6 @@ typedef struct io_handle_ { } io_handle_s; typedef void (*io_thread_cb_fn)(io_thread_s *thread); -typedef bool (*io_thread_stream_cb_fn)(bbl_stream_s *stream); typedef struct io_thread_ { pthread_t thread; @@ -108,18 +116,11 @@ typedef struct io_thread_ { io_thread_cb_fn run_fn; io_thread_cb_fn teardown_fn; - io_thread_stream_cb_fn stream_tx_fn; - uint8_t *sp; io_handle_s *io; bbl_txq_s *txq; - struct { - struct timer_root_ root; - struct timer_ *io; - } timer; - struct io_thread_ *next; } io_thread_s; diff --git a/code/bngblaster/src/io/io_dpdk.c b/code/bngblaster/src/io/io_dpdk.c index 1a3862e6..416e1f64 100644 --- a/code/bngblaster/src/io/io_dpdk.c +++ b/code/bngblaster/src/io/io_dpdk.c @@ -253,8 +253,6 @@ io_dpdk_tx_job(timer_s *timer) assert(io->direction == IO_EGRESS); assert(io->thread == NULL); - io_update_stream_token_bucket(io); - /* Get TX timestamp */ //clock_gettime(CLOCK_MONOTONIC, &io->timestamp); io->timestamp.tv_sec = timer->timestamp->tv_sec; @@ -380,8 +378,6 @@ io_dpdk_thread_tx_job(timer_s *timer) assert(io->direction == IO_EGRESS); assert(io->thread); - io_update_stream_token_bucket(io); - /* Get TX timestamp */ //clock_gettime(CLOCK_MONOTONIC, &io->timestamp); io->timestamp.tv_sec = timer->timestamp->tv_sec; @@ -557,7 +553,6 @@ io_dpdk_interface_init(bbl_interface_s *interface) io->next = interface->io.rx; interface->io.rx = io; io->interface = interface; - CIRCLEQ_INIT(&io->stream_tx_qhead); if(config->rx_threads) { if(!io_thread_init(io)) { return false; @@ -597,7 +592,6 @@ io_dpdk_interface_init(bbl_interface_s *interface) interface->io.tx = io; io->interface = interface; io->buf = malloc(IO_BUFFER_LEN); - CIRCLEQ_INIT(&io->stream_tx_qhead); if(config->tx_threads) { if(!io_thread_init(io)) { return false; diff --git a/code/bngblaster/src/io/io_interface.c b/code/bngblaster/src/io/io_interface.c index bf14cfd6..adbc8911 100644 --- a/code/bngblaster/src/io/io_interface.c +++ b/code/bngblaster/src/io/io_interface.c @@ -89,7 +89,6 @@ io_interface_init_rx(bbl_interface_s *interface) io->next = interface->io.rx; interface->io.rx = io; io->interface = interface; - CIRCLEQ_INIT(&io->stream_tx_qhead); if(config->rx_threads) { if(!io_thread_init(io)) { return false; @@ -136,7 +135,6 @@ io_interface_init_tx(bbl_interface_s *interface) io->next = interface->io.tx; interface->io.tx = io; io->interface = interface; - CIRCLEQ_INIT(&io->stream_tx_qhead); if(config->tx_threads) { if(!io_thread_init(io)) { return false; diff --git a/code/bngblaster/src/io/io_packet_mmap.c b/code/bngblaster/src/io/io_packet_mmap.c index a1be558a..f2d6dd79 100644 --- a/code/bngblaster/src/io/io_packet_mmap.c +++ b/code/bngblaster/src/io/io_packet_mmap.c @@ -124,7 +124,9 @@ io_packet_mmap_tx_job(timer_s *timer) struct tpacket2_hdr* tphdr; uint8_t *frame_ptr; - uint32_t stream_packets = 0; + bbl_stream_s *stream = NULL; + uint16_t burst = interface->config->io_burst; + bool ctrl = true; bool pcap = false; @@ -132,8 +134,6 @@ io_packet_mmap_tx_job(timer_s *timer) assert(io->direction == IO_EGRESS); assert(io->thread == NULL); - io_update_stream_token_bucket(io); - frame_ptr = io->ring + (io->cursor * io->req.tp_frame_size); tphdr = (struct tpacket2_hdr*)frame_ptr; if(tphdr->tp_status != TP_STATUS_AVAILABLE) { @@ -145,7 +145,7 @@ io_packet_mmap_tx_job(timer_s *timer) //clock_gettime(CLOCK_MONOTONIC, &io->timestamp); io->timestamp.tv_sec = timer->timestamp->tv_sec; io->timestamp.tv_nsec = timer->timestamp->tv_nsec; - while(true) { + while(burst) { /* Check if this slot available for writing. */ if(tphdr->tp_status != TP_STATUS_AVAILABLE) { io->stats.no_buffer++; @@ -160,21 +160,20 @@ io_packet_mmap_tx_job(timer_s *timer) continue; } } else { - /* Send traffic streams up to allowed burst. */ - if(++stream_packets > io->stream_burst) { + stream = bbl_stream_io_send_iter(io); + if(!stream) { break; } - if(bbl_stream_tx(io, io->buf, &io->buf_len) != PROTOCOL_SUCCESS) { - break; - } - } - + memcpy(io->buf, stream->tx_buf, stream->tx_len); + io->buf_len = stream->tx_len; + } tphdr->tp_len = io->buf_len; tphdr->tp_status = TP_STATUS_SEND_REQUEST; io->queued++; io->stats.packets++; io->stats.bytes += io->buf_len; + burst--; /* Dump the packet into pcap file. */ if(g_ctx->pcap.write_buf && (ctrl || g_ctx->pcap.include_streams)) { @@ -232,8 +231,9 @@ io_packet_mmap_thread_rx_run_fn(io_thread_s *thread) tphdr = (struct tpacket2_hdr*)frame_ptr; if(!(tphdr->tp_status & TP_STATUS_USER)) { /* If no buffer is available poll kernel */ - poll_kernel(io, POLLIN); - sleep.tv_nsec = 100000; /* 0.1ms */ + //poll_kernel(io, POLLIN); + //sleep.tv_nsec = 100000; /* 0.1ms */ + sleep.tv_nsec = 1000; /* 0.001ms */ nanosleep(&sleep, &rem); continue; } @@ -259,13 +259,9 @@ io_packet_mmap_thread_rx_run_fn(io_thread_s *thread) } } -/** - * This job is for PACKET_MMAP TX in worker thread! - */ void -io_packet_mmap_thread_tx_job(timer_s *timer) +io_packet_mmap_thread_tx_run_fn(io_thread_s *thread) { - io_thread_s *thread = timer->data; io_handle_s *io = thread->io; bbl_interface_s *interface = io->interface; @@ -275,30 +271,41 @@ io_packet_mmap_thread_tx_job(timer_s *timer) struct tpacket2_hdr* tphdr; uint8_t *frame_ptr; - uint32_t stream_packets = 0; + bbl_stream_s *stream = NULL; + uint16_t io_burst = interface->config->io_burst; + uint16_t burst = 0; + bool ctrl = true; + struct timespec sleep, rem; + sleep.tv_sec = 0; + sleep.tv_nsec = 10; + assert(io->mode == IO_MODE_PACKET_MMAP); assert(io->direction == IO_EGRESS); assert(io->thread); - io_update_stream_token_bucket(io); + while(thread->active) { + nanosleep(&sleep, &rem); + + frame_ptr = io->ring + (io->cursor * io->req.tp_frame_size); + tphdr = (struct tpacket2_hdr *)frame_ptr; + if(tphdr->tp_status != TP_STATUS_AVAILABLE) { + /* If no buffer is available poll kernel. */ + io->stats.no_buffer++; + poll_kernel(io, POLLOUT); + continue; + } - frame_ptr = io->ring + (io->cursor * io->req.tp_frame_size); - tphdr = (struct tpacket2_hdr *)frame_ptr; - if(tphdr->tp_status != TP_STATUS_AVAILABLE) { - /* If no buffer is available poll kernel. */ - poll_kernel(io, POLLOUT); - io->stats.no_buffer++; - } else { /* Get TX timestamp */ - //clock_gettime(CLOCK_MONOTONIC, &io->timestamp); - io->timestamp.tv_sec = timer->timestamp->tv_sec; - io->timestamp.tv_nsec = timer->timestamp->tv_nsec; - while(true) { - /* Check if this slot available for writing. */ + clock_gettime(CLOCK_MONOTONIC, &io->timestamp); + + burst = io_burst; + ctrl = true; + while(burst) { if(tphdr->tp_status != TP_STATUS_AVAILABLE) { io->stats.no_buffer++; + poll_kernel(io, POLLOUT); break; } io->buf = frame_ptr + TPACKET2_HDRLEN - sizeof(struct sockaddr_ll); @@ -316,12 +323,12 @@ io_packet_mmap_thread_tx_job(timer_s *timer) } } else { /* Send traffic streams up to allowed burst. */ - if(++stream_packets > io->stream_burst) { - break; - } - if(bbl_stream_tx(io, io->buf, &io->buf_len) != PROTOCOL_SUCCESS) { + stream = bbl_stream_io_send_iter(io); + if(!stream) { break; } + memcpy(io->buf, stream->tx_buf, stream->tx_len); + io->buf_len = stream->tx_len; } tphdr->tp_len = io->buf_len; @@ -330,22 +337,23 @@ io_packet_mmap_thread_tx_job(timer_s *timer) io->queued++; io->stats.packets++; io->stats.bytes += io->buf_len; + burst--; /* Get next slot. */ io->cursor = (io->cursor + 1) % io->req.tp_frame_nr; frame_ptr = io->ring + (io->cursor * io->req.tp_frame_size); tphdr = (struct tpacket2_hdr *)frame_ptr; } - } - if(io->queued) { - /* Notify kernel. */ - if(sendto(io->fd, NULL, 0, 0, NULL, 0) < 0) { - LOG(IO, "PACKET_MMAP sendto on interface %s failed with error %s (%d)\n", - interface->name, strerror(errno), errno); - io->stats.io_errors++; - } else { - io->queued = 0; + if(io->queued) { + /* Notify kernel. */ + if(sendto(io->fd, NULL, 0, 0, NULL, 0) < 0) { + LOG(IO, "PACKET_MMAP sendto on interface %s failed with error %s (%d)\n", + interface->name, strerror(errno), errno); + io->stats.io_errors++; + } else { + io->queued = 0; + } } } } @@ -366,9 +374,7 @@ io_packet_mmap_init(io_handle_s *io) if(io->direction == IO_INGRESS) { thread->run_fn = io_packet_mmap_thread_rx_run_fn; } else { - timer_add_periodic(&thread->timer.root, &thread->timer.io, "TX (threaded)", 0, - config->tx_interval, thread, &io_packet_mmap_thread_tx_job); - thread->timer.io->reset = false; + thread->run_fn = io_packet_mmap_thread_tx_run_fn; } } else { if(io->direction == IO_INGRESS) { diff --git a/code/bngblaster/src/io/io_raw.c b/code/bngblaster/src/io/io_raw.c index fa55d451..2e8e2ee4 100644 --- a/code/bngblaster/src/io/io_raw.c +++ b/code/bngblaster/src/io/io_raw.c @@ -89,7 +89,9 @@ io_raw_tx_job(timer_s *timer) io_handle_s *io = timer->data; bbl_interface_s *interface = io->interface; - uint32_t stream_packets = 0; + bbl_stream_s *stream = NULL; + uint16_t burst = interface->config->io_burst; + bool ctrl = true; bool pcap = false; @@ -97,47 +99,41 @@ io_raw_tx_job(timer_s *timer) assert(io->direction == IO_EGRESS); assert(io->thread == NULL); - io_update_stream_token_bucket(io); - /* Get TX timestamp */ //clock_gettime(CLOCK_MONOTONIC, &io->timestamp); io->timestamp.tv_sec = timer->timestamp->tv_sec; io->timestamp.tv_nsec = timer->timestamp->tv_nsec; - while(true) { + while(burst) { /* If sendto fails, the failed packet remains in TX buffer to be retried * in the next interval. */ - if(io->buf_len) { - if(packet_is_bbl(io->buf, io->buf_len)) { - /* Update timestamp if BBL traffic is retried. */ - *(uint32_t*)(io->buf + (io->buf_len - 8)) = io->timestamp.tv_sec; - *(uint32_t*)(io->buf + (io->buf_len - 4)) = io->timestamp.tv_nsec; - } - } else if(ctrl) { - /* First send all control traffic which has higher priority. */ - if(bbl_tx(interface, io->buf, &io->buf_len) != PROTOCOL_SUCCESS) { - io->buf_len = 0; - ctrl = false; - continue; - } - } else { - /* Send traffic streams up to allowed burst. */ - if(++stream_packets > io->stream_burst) { - break; - } - if(bbl_stream_tx(io, io->buf, &io->buf_len) != PROTOCOL_SUCCESS) { - break; + if(io->buf_len == 0) { + if(ctrl) { + /* First send all control traffic which has higher priority. */ + if(bbl_tx(interface, io->buf, &io->buf_len) != PROTOCOL_SUCCESS) { + io->buf_len = 0; + ctrl = false; + continue; + } + } else { + /* Send traffic streams up to allowed burst. */ + stream = bbl_stream_io_send_iter(io); + if(!stream) { + break; + } + memcpy(io->buf, stream->tx_buf, stream->tx_len); + io->buf_len = stream->tx_len; } } - if(sendto(io->fd, io->buf, io->buf_len, 0, (struct sockaddr*)&io->addr, sizeof(struct sockaddr_ll)) <0 ) { if(errno == EMSGSIZE) { io_raw_tx_lo_long(io); } else { - /* This packet will be retried next interval - * because io->buf_len is not reset to zero. */ LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", interface->name, strerror(errno), errno); io->stats.io_errors++; + /* This packet will be retried next interval + * because io->buf_len is not reset to zero. */ + if(stream) io->buf_len = 0; /* Do not retry stream packets. */ break; } } else { @@ -147,9 +143,13 @@ io_raw_tx_job(timer_s *timer) pcapng_push_packet_header(&io->timestamp, io->buf, io->buf_len, interface->ifindex, PCAPNG_EPB_FLAGS_OUTBOUND); } + if(stream) { + stream->tx_packets++; + stream->flow_seq++; + } io->stats.packets++; io->stats.bytes += io->buf_len; - + burst--; } io->buf_len = 0; } @@ -188,77 +188,77 @@ io_raw_thread_rx_run_fn(io_thread_s *thread) } } -/** - * This job is for RAW TX in worker thread! - */ void -io_raw_thread_tx_job(timer_s *timer) +io_raw_thread_tx_run_fn(io_thread_s *thread) { - io_thread_s *thread = timer->data; io_handle_s *io = thread->io; + bbl_interface_s *interface = io->interface; bbl_txq_s *txq = thread->txq; bbl_txq_slot_t *slot; - uint32_t stream_packets = 0; + bbl_stream_s *stream = NULL; + uint16_t io_burst = interface->config->io_burst; + uint16_t burst = 0; + + struct timespec sleep, rem; + sleep.tv_sec = 0; + sleep.tv_nsec = 1000 * io_burst; assert(io->mode == IO_MODE_RAW); assert(io->direction == IO_EGRESS); assert(io->thread); - io_update_stream_token_bucket(io); + while(thread->active) { + nanosleep(&sleep, &rem); + burst = io_burst; - /* First send all control traffic which has higher priority. */ - while((slot = bbl_txq_read_slot(txq))) { - /* This packet will be retried next interval - * because slot is not marked as read. */ - if(sendto(io->fd, slot->packet, slot->packet_len, 0, (struct sockaddr*)&io->addr, sizeof(struct sockaddr_ll)) <0 ) { - LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", - io->interface->name, strerror(errno), errno); - io->stats.io_errors++; - return; + /* First send all control traffic which has higher priority. */ + while((slot = bbl_txq_read_slot(txq))) { + /* This packet will be retried next interval + * because slot is not marked as read. */ + if(sendto(io->fd, slot->packet, slot->packet_len, 0, (struct sockaddr*)&io->addr, sizeof(struct sockaddr_ll)) <0 ) { + LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", + io->interface->name, strerror(errno), errno); + io->stats.io_errors++; + nanosleep(&sleep, &rem); + continue; + } + io->stats.packets++; + io->stats.bytes += slot->packet_len; + bbl_txq_read_next(txq); + if(burst) burst--; } - io->stats.packets++; - io->stats.bytes += slot->packet_len; - bbl_txq_read_next(txq); - } - /* Get TX timestamp */ - //clock_gettime(CLOCK_MONOTONIC, &io->timestamp); - io->timestamp.tv_sec = timer->timestamp->tv_sec; - io->timestamp.tv_nsec = timer->timestamp->tv_nsec; + /* Get TX timestamp */ + clock_gettime(CLOCK_MONOTONIC, &io->timestamp); - /* Send traffic streams up to allowed burst. */ - while(stream_packets++ < io->stream_burst) { - /* If sendto fails, the failed packet remains in TX buffer - * to be retried in the next interval. */ - if(io->buf_len) { - if(packet_is_bbl(io->buf, io->buf_len)) { - /* Update timestamp if BBL traffic is retried. */ - *(uint32_t*)(io->buf + (io->buf_len - 8)) = io->timestamp.tv_sec; - *(uint32_t*)(io->buf + (io->buf_len - 4)) = io->timestamp.tv_nsec; - } - } else { - if(bbl_stream_tx(io, io->buf, &io->buf_len) != PROTOCOL_SUCCESS) { + while(burst) { + /* Send traffic streams up to allowed burst. */ + stream = bbl_stream_io_send_iter(io); + if(!stream) { break; } - } - if(sendto(io->fd, io->buf, io->buf_len, 0, (struct sockaddr*)&io->addr, sizeof(struct sockaddr_ll)) <0 ) { - if(errno == EMSGSIZE) { - io_raw_tx_lo_long(io); + if(sendto(io->fd, stream->tx_buf, stream->tx_len, 0, (struct sockaddr*)&io->addr, sizeof(struct sockaddr_ll)) <0 ) { + if(errno == EMSGSIZE) { + io_raw_tx_lo_long(io); + } else { + /* This packet will be retried next interval + * because io->buf_len is not reset to zero. */ + LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", + io->interface->name, strerror(errno), errno); + io->stats.io_errors++; + burst = 0; + break; + } } else { - /* This packet will be retried next interval - * because io->buf_len is not reset to zero. */ - LOG(IO, "RAW sendto on interface %s failed with error %s (%d)\n", - io->interface->name, strerror(errno), errno); - io->stats.io_errors++; - break; + stream->tx_packets++; + stream->flow_seq++; + io->stats.packets++; + io->stats.bytes += io->buf_len; + burst--; } - } else { - io->stats.packets++; - io->stats.bytes += io->buf_len; } - io->buf_len = 0; } } @@ -280,9 +280,7 @@ io_raw_init(io_handle_s *io) if(io->direction == IO_INGRESS) { thread->run_fn = io_raw_thread_rx_run_fn; } else { - timer_add_periodic(&thread->timer.root, &thread->timer.io, "TX (threaded)", 0, - config->tx_interval, thread, &io_raw_thread_tx_job); - thread->timer.io->reset = false; + thread->run_fn = io_raw_thread_tx_run_fn; } } else { if(io->direction == IO_INGRESS) { diff --git a/code/bngblaster/src/io/io_thread.c b/code/bngblaster/src/io/io_thread.c index 01b0ad3b..a0201b96 100644 --- a/code/bngblaster/src/io/io_thread.c +++ b/code/bngblaster/src/io/io_thread.c @@ -62,7 +62,7 @@ io_thread_rx_handler(io_thread_s *thread, io_handle_s *io) io->stats.packets++; io->stats.bytes += io->buf_len; - if(likely(packet_is_bbl(io->buf, io->buf_len))) { + if(packet_is_bbl(io->buf, io->buf_len)) { /** Process */ decode_result = decode_ethernet(io->buf, io->buf_len, thread->sp, SCRATCHPAD_LEN, ð); if(decode_result == PROTOCOL_SUCCESS) { @@ -211,17 +211,6 @@ io_thread_main(void *thread_data) return NULL; } -void -io_thread_timer_loop(io_thread_s *thread) -{ - pthread_mutex_lock(&thread->mutex); - timer_smear_all_buckets(&thread->timer.root); - pthread_mutex_unlock(&thread->mutex); - while(thread->active) { - timer_walk(&thread->timer.root); - } -} - bool io_thread_init(io_handle_s *io) { @@ -262,11 +251,8 @@ io_thread_init(io_handle_s *io) return false; } - /* Init thread timer root */ - timer_init_root(&thread->timer.root); - /* Default run function which might be overwritten */ - thread->run_fn = io_thread_timer_loop; + thread->run_fn = NULL; /* Add thread main loop timers/jobs */ if(io->direction == IO_INGRESS && !interface->io.rx_job) { @@ -313,7 +299,6 @@ io_thread_start_all() sleep.tv_nsec = 7800179; /* prime number between 7 and 8ms */ while(thread) { thread->active = true; - timer_smear_all_buckets(&thread->timer.root); pthread_create(&thread->thread, NULL, io_thread_main, (void *)thread); if(thread->set_cpu_affinity) { if(pthread_setaffinity_np(thread->thread, sizeof(cpu_set_t), &thread->cpuset)) {