From 3559938cc5cc81f57fe9e48eff155a8ddcc78bd2 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Tue, 19 Nov 2024 09:04:54 +0000 Subject: [PATCH 1/4] Use mutex to protect the sessions context list Signed-off-by: Kasiewicz, Marek --- media-proxy/include/proxy_context.h | 2 ++ media-proxy/src/api_server_tcp.cc | 5 ++++- media-proxy/src/proxy_context.cc | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/media-proxy/include/proxy_context.h b/media-proxy/include/proxy_context.h index 04b803d7..88526217 100644 --- a/media-proxy/include/proxy_context.h +++ b/media-proxy/include/proxy_context.h @@ -17,6 +17,7 @@ #include "session-mtl.h" #include "session-rdma.h" +#include #pragma once @@ -34,6 +35,7 @@ class ProxyContext { std::vector mDpCtx; mtl_handle mDevHandle = NULL; libfabric_ctx *mDevHandle_rdma = NULL; + std::mutex ctx_mtx; bool imtl_init_preparing; pthread_mutex_t mutex_lock; diff --git a/media-proxy/src/api_server_tcp.cc b/media-proxy/src/api_server_tcp.cc index a086ff6b..1f44e8b3 100644 --- a/media-proxy/src/api_server_tcp.cc +++ b/media-proxy/src/api_server_tcp.cc @@ -127,11 +127,13 @@ void* msg_loop(void* ptr) /* TODO: return memdif ID */ break; case MCM_QUERY_MEMIF_PARAM: + { DEBUG("MCM_QUERY_MEMIF_PARAM: Case entry."); if (buffer == NULL || msg.command.data_len < 4) { INFO("Invalid parameters."); break; } + std::lock_guard lock(proxy_ctx->ctx_mtx); session_id = *(uint32_t*)buffer; for (auto it : proxy_ctx->mDpCtx) { if (it->get_id() == session_id) { @@ -153,7 +155,8 @@ void* msg_loop(void* ptr) break; } } - break; + break; + } case MCM_DESTROY_SESSION: DEBUG("MCM_DESTROY_SESSION: Case entry."); if (buffer == NULL || msg.command.data_len < 4) { diff --git a/media-proxy/src/proxy_context.cc b/media-proxy/src/proxy_context.cc index 7b07f6d4..93d38c11 100644 --- a/media-proxy/src/proxy_context.cc +++ b/media-proxy/src/proxy_context.cc @@ -186,6 +186,7 @@ int ProxyContext::RxStart_rdma(const mcm_conn_param *request) } INFO("%s, session id: %d", __func__, session_ptr->get_id()); + std::lock_guard lock(ctx_mtx); mDpCtx.push_back(session_ptr); return session_ptr->get_id(); } @@ -251,6 +252,7 @@ int ProxyContext::RxStart_mtl(const mcm_conn_param *request) } INFO("%s, session id: %d", __func__, session_ptr->get_id()); + std::lock_guard lock(ctx_mtx); mDpCtx.push_back(session_ptr); return session_ptr->get_id(); } @@ -287,6 +289,7 @@ int ProxyContext::TxStart_rdma(const mcm_conn_param *request) } INFO("%s, session id: %d", __func__, session_ptr->get_id()); + std::lock_guard lock(ctx_mtx); mDpCtx.push_back(session_ptr); return session_ptr->get_id(); } @@ -351,6 +354,7 @@ int ProxyContext::TxStart_mtl(const mcm_conn_param *request) } INFO("%s, session id: %d", __func__, session_ptr->get_id()); + std::lock_guard lock(ctx_mtx); mDpCtx.push_back(session_ptr); return session_ptr->get_id(); } @@ -366,6 +370,7 @@ int ProxyContext::TxStart(const mcm_conn_param *request) int ProxyContext::Stop(const int32_t session_id) { int ret = 0; + std::lock_guard lock(ctx_mtx); auto ctx = std::find_if(mDpCtx.begin(), mDpCtx.end(), [session_id](auto it) { return it->get_id() == session_id; }); if (ctx != mDpCtx.end()) { From a32dda1f064bcb99bfb3630782690ed358b16884 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Tue, 19 Nov 2024 09:04:29 +0000 Subject: [PATCH 2/4] Add pingpong apps capable of measuring latency Signed-off-by: Kasiewicz, Marek --- media-proxy/src/api_server_tcp.cc | 5 +- sdk/samples/CMakeLists.txt | 8 + sdk/samples/ping_app.c | 399 ++++++++++++++++++ sdk/samples/pingpong_common.c | 127 ++++++ sdk/samples/pingpong_common.h | 53 +++ sdk/samples/pong_app.c | 319 ++++++++++++++ .../test-rdma-latency.sh | 178 ++++++++ 7 files changed, 1086 insertions(+), 3 deletions(-) create mode 100644 sdk/samples/ping_app.c create mode 100644 sdk/samples/pingpong_common.c create mode 100644 sdk/samples/pingpong_common.h create mode 100644 sdk/samples/pong_app.c create mode 100755 tests/single-node-sample-apps/test-rdma-latency.sh diff --git a/media-proxy/src/api_server_tcp.cc b/media-proxy/src/api_server_tcp.cc index 1f44e8b3..2132f9da 100644 --- a/media-proxy/src/api_server_tcp.cc +++ b/media-proxy/src/api_server_tcp.cc @@ -126,8 +126,7 @@ void* msg_loop(void* ptr) DEBUG("MCM_QUERY_MEMIF_ID: Case entry."); /* TODO: return memdif ID */ break; - case MCM_QUERY_MEMIF_PARAM: - { + case MCM_QUERY_MEMIF_PARAM: { DEBUG("MCM_QUERY_MEMIF_PARAM: Case entry."); if (buffer == NULL || msg.command.data_len < 4) { INFO("Invalid parameters."); @@ -155,7 +154,7 @@ void* msg_loop(void* ptr) break; } } - break; + break; } case MCM_DESTROY_SESSION: DEBUG("MCM_DESTROY_SESSION: Case entry."); diff --git a/sdk/samples/CMakeLists.txt b/sdk/samples/CMakeLists.txt index 40fc55de..39a8add9 100644 --- a/sdk/samples/CMakeLists.txt +++ b/sdk/samples/CMakeLists.txt @@ -17,3 +17,11 @@ target_link_libraries(sender_app PRIVATE mcm_dp) add_executable(recver_app recver_app.c) target_include_directories(recver_app PRIVATE ../include) target_link_libraries(recver_app PRIVATE mcm_dp) + +add_executable(ping_app ping_app.c pingpong_common.c) +target_include_directories(ping_app PRIVATE ../include) +target_link_libraries(ping_app PRIVATE mcm_dp) + +add_executable(pong_app pong_app.c pingpong_common.c) +target_include_directories(pong_app PRIVATE ../include) +target_link_libraries(pong_app PRIVATE mcm_dp) diff --git a/sdk/samples/ping_app.c b/sdk/samples/ping_app.c new file mode 100644 index 00000000..dee4d8bd --- /dev/null +++ b/sdk/samples/ping_app.c @@ -0,0 +1,399 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "mesh_dp.h" +#include +#include +#include +#include "pingpong_common.h" + +#define DEFAULT_RECV_PORT "9001" +#define DEFAULT_SEND_PORT "10001" + +static volatile bool keepRunning = true; + +static MeshClient *client; +atomic_int counter = 0; + + +Config config = { + .recv_addr = DEFAULT_RECV_IP, + .recv_port = DEFAULT_RECV_PORT, + .send_addr = DEFAULT_SEND_IP, + .send_port = DEFAULT_SEND_PORT, + + .payload_type = "", + .protocol_type = "", + .pix_fmt_string = DEFAULT_VIDEO_FMT, + .socket_path = DEFAULT_MEMIF_SOCKET_PATH, + .interface_id = DEFAULT_MEMIF_INTERFACE_ID, + .loop = DEFAULT_INFINITE_LOOP, + + .width = DEFAULT_FRAME_WIDTH, + .height = DEFAULT_FRAME_HEIGHT, + .vid_fps = DEFAULT_FPS, + .frame_size = 0, + .total_num = 300 +}; + +void intHandler(int dummy) +{ + keepRunning = 0; +} + +/* print a description of all supported options */ +void usage(FILE *fp, const char *path) +{ + /* take only the last portion of the path */ + const char *basename = strrchr(path, '/'); + basename = basename ? basename + 1 : path; + + fprintf(fp, "usage: %s [OPTION]\n", basename); + fprintf(fp, "-H, --help\t\t\t" + "Print this help and exit\n"); + fprintf(fp, "-w, --width=\t" + "Width of test video frame (default: %d)\n", + DEFAULT_FRAME_WIDTH); + fprintf(fp, "-h, --height=\t" + "Height of test video frame (default: %d)\n", + DEFAULT_FRAME_HEIGHT); + fprintf(fp, "-f, --fps=\t\t" + "Test video FPS (frame per second) (default: %0.2f)\n", + DEFAULT_FPS); + fprintf(fp, "-s, --ip=ip_address\t\t" + "Send data to IP address (default: %s)\n", + DEFAULT_SEND_IP); + fprintf(fp, "-p, --port=port_number\t\t" + "Send data to Port (default: %s)\n", + DEFAULT_SEND_PORT); + fprintf(fp, "-o, --protocol=protocol_type\t" + "Set protocol type (default: %s)\n", + DEFAULT_PROTOCOL); + fprintf(fp, "-n, --number=frame_number\t" + "Total frame number to send (default: %d)\n", + DEFAULT_TOTAL_NUM); + fprintf(fp, "-s, --socketpath=socket_path\t" + "Set memif socket path (default: %s)\n", + DEFAULT_MEMIF_SOCKET_PATH); + fprintf(fp, "-d, --interfaceid=interface_id\t" + "Set memif conn interface id (default: %d)\n", + DEFAULT_MEMIF_INTERFACE_ID); + fprintf(fp, "-l, --loop=is_loop\t" + "Set infinite loop sending (default: %d)\n", + DEFAULT_INFINITE_LOOP); + fprintf(fp, "-t, --threads=threads_num\t" + "Set threads_num (default: %d)\n", + 1); + fprintf(fp, "\n"); +} + +typedef struct { + int thread_id; + double *latency_results; +} t_data; + +void *receiver_thread(void *arg) +{ + t_data *thread_data = (t_data *)arg; + struct timespec send_time = {}, now = {}; + int recved_pkt_num; + cpu_set_t cpuset; + int err; + int timeout_ms = -1; + + MeshConnection *conn; + MeshBuffer *buf; + + err = mesh_create_connection(client, &conn); + if (err) { + printf("Failed to create a mesh connection: %s (%d)\n", mesh_err2str(err), err); + pthread_exit(NULL); + } + + err = init_conn(conn, &config, MESH_CONN_KIND_RECEIVER, thread_data->thread_id); + if (err) { + printf("ERROR: init_conn failed \n"); + mesh_delete_connection(&conn); + pthread_exit(NULL); + } + + if (thread_data->thread_id == 0) + config.frame_size = conn->buf_size; + + // Pin the thread to specific CPU core + CPU_ZERO(&cpuset); + CPU_SET((thread_data->thread_id + config.threads_num) % CPU_CORES, &cpuset); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { + perror("pthread_setaffinity_np"); + pthread_exit(NULL); + } + + while (1) { + err = mesh_get_buffer_timeout(conn, &buf, timeout_ms); + if (err == -MESH_ERR_CONN_CLOSED) { + printf("Connection closed\n"); + break; + } + if (err) { + printf("Failed to get buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + timeout_ms = 6000; + + memcpy(&send_time, buf->data, sizeof(send_time)); + memcpy(&recved_pkt_num, buf->data + sizeof(send_time), sizeof(recved_pkt_num)); + clock_gettime(CLOCK_REALTIME, &now); + double latency_us = 1000000.0 * (now.tv_sec - send_time.tv_sec); + latency_us += (now.tv_nsec - send_time.tv_nsec) / 1000.0; + thread_data->latency_results[recved_pkt_num] = latency_us; + + err = mesh_put_buffer(&buf); + if (err) { + printf("Failed to put buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + } + + /* Clean up */ + err = mesh_delete_connection(&conn); + if (err) + printf("Failed to delete connection: %s (%d)\n", mesh_err2str(err), err); + + pthread_exit(NULL); +} + +void *sender_thread(void *arg) +{ + int thread_id = *(int *)arg; + struct timespec send_time = {}; + cpu_set_t cpuset; + int err; + + MeshConnection *conn; + MeshBuffer *buf; + + err = mesh_create_connection(client, &conn); + if (err) { + printf("Failed to create a mesh connection: %s (%d)\n", mesh_err2str(err), err); + pthread_exit(NULL); + } + + err = init_conn(conn, &config, MESH_CONN_KIND_SENDER, thread_id); + if (err) { + printf("ERROR: init_conn failed \n"); + mesh_delete_connection(&conn); + pthread_exit(NULL); + } + + // Pin the thread to specific CPU core + CPU_ZERO(&cpuset); + CPU_SET(thread_id % CPU_CORES, &cpuset); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { + perror("pthread_setaffinity_np"); + pthread_exit(NULL); + } + + // Wait and send buffers based on atomic counter value + for (int i = 0; i < TRANSFERS_NUM; i++) { + + err = mesh_get_buffer(conn, &buf); + if (err) { + printf("Failed to get buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + while (atomic_load(&counter) == i); // Spin until the counter increments + clock_gettime(CLOCK_REALTIME, &send_time); + memcpy(buf->data, &send_time, sizeof(send_time)); + memcpy(buf->data + sizeof(send_time), &i, sizeof(i)); + + err = mesh_put_buffer(&buf); + if (err) { + printf("Failed to put buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + } + + /* Clean up */ + err = mesh_delete_connection(&conn); + if (err) + printf("Failed to delete connection: %s (%d)\n", mesh_err2str(err), err); + + pthread_exit(NULL); +} + +int main(int argc, char **argv) +{ + int err; + + int help_flag = 0; + int opt; + struct option longopts[] = { + { "help", no_argument, &help_flag, 'H' }, + { "width", required_argument, NULL, 'w' }, + { "height", required_argument, NULL, 'h' }, + { "fps", required_argument, NULL, 'f' }, + { "rcv_ip", required_argument, NULL, 'r' }, + { "rcv_port", required_argument, NULL, 'i' }, + { "send_ip", required_argument, NULL, 's' }, + { "send_port", required_argument, NULL, 'p' }, + { "protocol", required_argument, NULL, 'o' }, + { "number", required_argument, NULL, 'n' }, + { "file", required_argument, NULL, 'b' }, + { "type", required_argument, NULL, 't' }, + { "socketpath", required_argument, NULL, 'k' }, + { "interfaceid", required_argument, NULL, 'd' }, + { "loop", required_argument, NULL, 'l' }, + { "pix_fmt", required_argument, NULL, 'x' }, + { "threads_num", required_argument, NULL, 'm' }, + { 0 } + }; + + /* infinite loop, to be broken when we are done parsing options */ + while (1) { + opt = getopt_long(argc, argv, "Hw:h:f:s:p:o:n:r:i:t:k:d:l:x:b:m:", longopts, 0); + if (opt == -1) { + break; + } + + switch (opt) { + case 'H': + help_flag = 1; + break; + case 'w': + config.width = atoi(optarg); + break; + case 'h': + config.height = atoi(optarg); + break; + case 'f': + config.vid_fps = atof(optarg); + break; + case 'r': + strlcpy(config.recv_addr, optarg, sizeof(config.recv_addr)); + break; + case 'i': + strlcpy(config.recv_port, optarg, sizeof(config.recv_port)); + break; + case 's': + strlcpy(config.send_addr, optarg, sizeof(config.send_addr)); + break; + case 'p': + strlcpy(config.send_port, optarg, sizeof(config.send_port)); + break; + case 'o': + strlcpy(config.protocol_type, optarg, sizeof(config.protocol_type)); + break; + case 'n': + config.total_num = atoi(optarg); + break; + case 't': + strlcpy(config.payload_type, optarg, sizeof(config.payload_type)); + break; + case 'k': + strlcpy(config.socket_path, optarg, sizeof(config.socket_path)); + break; + case 'd': + config.interface_id = atoi(optarg); + break; + case 'l': + config.loop = (atoi(optarg) > 0); + break; + case 'x': + strlcpy(config.pix_fmt_string, optarg, sizeof(config.pix_fmt_string)); + break; + case 'm': + config.threads_num = atoi(optarg); + break; + case '?': + usage(stderr, argv[0]); + return 1; + default: + break; + } + } + + if (help_flag) { + usage(stdout, argv[0]); + return 0; + } + signal(SIGINT, intHandler); + + err = mesh_create_client(&client, NULL); + if (err) { + printf("Failed to create a mesh client: %s (%d)\n", mesh_err2str(err), err); + exit(-1); + } + + pthread_t *sender_threads = malloc(config.threads_num * sizeof(pthread_t)); + pthread_t *receiver_threads = malloc(config.threads_num * sizeof(pthread_t)); + t_data *recv_threads_data = malloc(config.threads_num * sizeof(t_data)); + int *sender_threads_id = malloc(config.threads_num * sizeof(int)); + + for (int i = 0; i < config.threads_num; i++) { + recv_threads_data[i].latency_results = + malloc(sizeof(*(recv_threads_data[i].latency_results)) * TRANSFERS_NUM); + recv_threads_data[i].thread_id = i; + if (pthread_create(&receiver_threads[i], NULL, receiver_thread, &recv_threads_data[i]) != + 0) { + perror("pthread_create"); + return 1; + } + usleep(100000); + sender_threads_id[i] = i; + if (pthread_create(&sender_threads[i], NULL, sender_thread, &sender_threads_id[i]) != 0) { + perror("pthread_create"); + return 1; + } + usleep(100000); + } + + // time to setup + sleep(4); + + // Increment atomic counter and signal threads + for (int i = 1; i <= TRANSFERS_NUM; i++) { + usleep(1000000.0 / config.vid_fps); // Delay to synchronize sending buffers + printf("Sending buffers number %d\n", i); + atomic_store(&counter, i); + } + + // Wait for all threads to finish + for (int i = 0; i < config.threads_num; i++) { + pthread_join(receiver_threads[i], NULL); + pthread_join(sender_threads[i], NULL); + } + + printf("thread_number; measurement_number; number_of_threads; request_size; latency\n\n"); + for (int i = 0; i < config.threads_num; i++) { + for (int j = 0; j < TRANSFERS_NUM; j++) { + printf("%d; %d; %d; %d; %lf \n", i, j, config.threads_num, config.frame_size, + recv_threads_data[i].latency_results[j]); + } + free(recv_threads_data[i].latency_results); + } + + free(sender_threads_id); + free(sender_threads); + free(receiver_threads); + free(recv_threads_data); + + mesh_delete_client(&client); + + exit(-1); +} diff --git a/sdk/samples/pingpong_common.c b/sdk/samples/pingpong_common.c new file mode 100644 index 00000000..94bd8a6b --- /dev/null +++ b/sdk/samples/pingpong_common.c @@ -0,0 +1,127 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +#include "pingpong_common.h" + +int init_conn(MeshConnection *conn, Config *config, int kind, int id) +{ + int err; + + /* protocol type */ + if (!strcmp(config->protocol_type, "memif")) { + MeshConfig_Memif cfg; + + strlcpy(cfg.socket_path, config->socket_path, sizeof(cfg.socket_path)); + cfg.interface_id = config->interface_id; + + err = mesh_apply_connection_config_memif(conn, &cfg); + if (err) { + printf("Failed to apply memif configuration: %s (%d)\n", mesh_err2str(err), err); + return -1; + } + } else { + if (!strcmp(config->payload_type, "rdma")) { + MeshConfig_RDMA cfg; + + strlcpy(cfg.remote_ip_addr, config->send_addr, sizeof(cfg.remote_ip_addr)); + cfg.remote_port = atoi(config->send_port) + id; + strlcpy(cfg.local_ip_addr, config->recv_addr, sizeof(cfg.local_ip_addr)); + cfg.local_port = atoi(config->recv_port) + id; + + err = mesh_apply_connection_config_rdma(conn, &cfg); + if (err) { + printf("Failed to apply RDMA configuration: %s (%d)\n", mesh_err2str(err), err); + return -1; + } + } else { + MeshConfig_ST2110 cfg; + + strlcpy(cfg.remote_ip_addr, config->send_addr, sizeof(cfg.remote_ip_addr)); + cfg.remote_port = atoi(config->send_port) + id; + strlcpy(cfg.local_ip_addr, config->recv_addr, sizeof(cfg.local_ip_addr)); + cfg.local_port = atoi(config->recv_port) + id; + + /* transport type */ + if (!strcmp(config->payload_type, "st20")) { + cfg.transport = MESH_CONN_TRANSPORT_ST2110_20; + } else if (!strcmp(config->payload_type, "st22")) { + cfg.transport = MESH_CONN_TRANSPORT_ST2110_22; + } else if (!strcmp(config->payload_type, "st30")) { + cfg.transport = MESH_CONN_TRANSPORT_ST2110_30; + } else { + printf("Unknown SMPTE ST2110 transport type: %s\n", config->payload_type); + return -1; + } + + err = mesh_apply_connection_config_st2110(conn, &cfg); + if (err) { + printf("Failed to apply SMPTE ST2110 configuration: %s (%d)\n", mesh_err2str(err), + err); + return -1; + } + } + } + + /* payload type */ + if (!strcmp(config->payload_type, "st20") || !strcmp(config->payload_type, "st22") || + !strcmp(config->payload_type, "rdma")) { + /* video */ + MeshConfig_Video cfg; + + if (!strncmp(config->pix_fmt_string, "yuv422p", sizeof(config->pix_fmt_string))) + cfg.pixel_format = MESH_VIDEO_PIXEL_FORMAT_YUV422P; + else if (!strncmp(config->pix_fmt_string, "yuv422p10le", sizeof(config->pix_fmt_string))) + cfg.pixel_format = MESH_VIDEO_PIXEL_FORMAT_YUV422P10LE; + else if (!strncmp(config->pix_fmt_string, "yuv444p10le", sizeof(config->pix_fmt_string))) + cfg.pixel_format = MESH_VIDEO_PIXEL_FORMAT_YUV444P10LE; + else if (!strncmp(config->pix_fmt_string, "rgb8", sizeof(config->pix_fmt_string))) + cfg.pixel_format = MESH_VIDEO_PIXEL_FORMAT_RGB8; + else + cfg.pixel_format = MESH_VIDEO_PIXEL_FORMAT_NV12; + + cfg.width = config->width; + cfg.height = config->height; + cfg.fps = config->vid_fps; + + err = mesh_apply_connection_config_video(conn, &cfg); + if (err) { + printf("Failed to apply video configuration: %s (%d)\n", mesh_err2str(err), err); + return -1; + } + } else if (!strcmp(config->payload_type, "st30")) { + /* audio */ + MeshConfig_Audio cfg; + + cfg.channels = 2; + cfg.format = MESH_AUDIO_FORMAT_PCM_S16BE; + cfg.sample_rate = MESH_AUDIO_SAMPLE_RATE_48000; + cfg.packet_time = MESH_AUDIO_PACKET_TIME_1MS; + + err = mesh_apply_connection_config_audio(conn, &cfg); + if (err) { + printf("Failed to apply audio configuration: %s (%d)\n", mesh_err2str(err), err); + return -1; + } + } else { + printf("Unknown payload type: %s\n", config->payload_type); + return -1; + } + + err = mesh_establish_connection(conn, kind); + if (err) { + printf("Failed to establish connection: %s (%d)\n", mesh_err2str(err), err); + return -1; + } + + return 0; +} diff --git a/sdk/samples/pingpong_common.h b/sdk/samples/pingpong_common.h new file mode 100644 index 00000000..8349741d --- /dev/null +++ b/sdk/samples/pingpong_common.h @@ -0,0 +1,53 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel CorporationPINGPONG_COMMON_H + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef PINGPONG_COMMON_H +#define PINGPONG_COMMON_H + +#include "mesh_dp.h" + +#define DEFAULT_RECV_IP "127.0.0.1" +#define DEFAULT_SEND_IP "127.0.0.1" + +#define DEFAULT_TOTAL_NUM 300 +#define DEFAULT_FRAME_WIDTH 1920 +#define DEFAULT_FRAME_HEIGHT 1080 +#define DEFAULT_FPS 2.0 +#define DEFAULT_MEMIF_SOCKET_PATH "/run/mcm/mcm_rx_memif.sock" +#define DEFAULT_MEMIF_INTERFACE_ID 0 +#define DEFAULT_PROTOCOL "auto" +#define DEFAULT_INFINITE_LOOP 0 +#define DEFAULT_VIDEO_FMT "yuv422p10le" + +#define CPU_CORES 28 // Number of available CPU cores +#define TRANSFERS_NUM 16 // Number of buffers to sent + +typedef struct { + char recv_addr[46]; + char recv_port[6]; + char send_addr[46]; + char send_port[6]; + + char payload_type[32]; + char protocol_type[32]; + char pix_fmt_string[32]; + char socket_path[108]; + uint32_t interface_id; + bool loop; + + /* video resolution */ + uint32_t width; + uint32_t height; + double vid_fps; + uint32_t frame_size; + uint32_t total_num; + + int threads_num; +} Config; + +int init_conn(MeshConnection *conn, Config *config, int kind, int id); + +#endif /* PINGPONG_COMMON_H */ diff --git a/sdk/samples/pong_app.c b/sdk/samples/pong_app.c new file mode 100644 index 00000000..72d035c1 --- /dev/null +++ b/sdk/samples/pong_app.c @@ -0,0 +1,319 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "mesh_dp.h" +#include +#include +#include +#include "pingpong_common.h" + +#define DEFAULT_RECV_PORT "10001" +#define DEFAULT_SEND_PORT "9001" + +static volatile bool keepRunning = true; + +static MeshClient *client; +atomic_int counter = 0; + + +Config config = { + .recv_addr = DEFAULT_RECV_IP, + .recv_port = DEFAULT_RECV_PORT, + .send_addr = DEFAULT_SEND_IP, + .send_port = DEFAULT_SEND_PORT, + + .payload_type = "", + .protocol_type = "", + .pix_fmt_string = DEFAULT_VIDEO_FMT, + .socket_path = DEFAULT_MEMIF_SOCKET_PATH, + .interface_id = DEFAULT_MEMIF_INTERFACE_ID, + .loop = DEFAULT_INFINITE_LOOP, + + .width = DEFAULT_FRAME_WIDTH, + .height = DEFAULT_FRAME_HEIGHT, + .vid_fps = DEFAULT_FPS, + .frame_size = 0, + .total_num = 300 +}; + +void intHandler(int dummy) { keepRunning = 0; } + +/* print a description of all supported options */ +void usage(FILE *fp, const char *path) +{ + /* take only the last portion of the path */ + const char *basename = strrchr(path, '/'); + basename = basename ? basename + 1 : path; + + fprintf(fp, "usage: %s [OPTION]\n", basename); + fprintf(fp, "-H, --help\t\t\t" + "Print this help and exit\n"); + fprintf(fp, "-w, --width=\t" + "Width of test video frame (default: %d)\n", + DEFAULT_FRAME_WIDTH); + fprintf(fp, "-h, --height=\t" + "Height of test video frame (default: %d)\n", + DEFAULT_FRAME_HEIGHT); + fprintf(fp, "-f, --fps=\t\t" + "Test video FPS (frame per second) (default: %0.2f)\n", + DEFAULT_FPS); + fprintf(fp, "-s, --ip=ip_address\t\t" + "Send data to IP address (default: %s)\n", + DEFAULT_SEND_IP); + fprintf(fp, "-p, --port=port_number\t\t" + "Send data to Port (default: %s)\n", + DEFAULT_SEND_PORT); + fprintf(fp, "-o, --protocol=protocol_type\t" + "Set protocol type (default: %s)\n", + DEFAULT_PROTOCOL); + fprintf(fp, "-n, --number=frame_number\t" + "Total frame number to send (default: %d)\n", + DEFAULT_TOTAL_NUM); + fprintf(fp, "-s, --socketpath=socket_path\t" + "Set memif socket path (default: %s)\n", + DEFAULT_MEMIF_SOCKET_PATH); + fprintf(fp, "-d, --interfaceid=interface_id\t" + "Set memif conn interface id (default: %d)\n", + DEFAULT_MEMIF_INTERFACE_ID); + fprintf(fp, "-l, --loop=is_loop\t" + "Set infinite loop sending (default: %d)\n", + DEFAULT_INFINITE_LOOP); + fprintf(fp, "\n"); +} + +void *pong_thread(void *arg) +{ + int thread_id = *(int *)arg; + cpu_set_t cpuset; + int err; + int timeout_ms = -1; + + MeshConnection *s_conn; + MeshConnection *r_conn; + MeshBuffer *s_buf; + MeshBuffer *r_buf; + + err = mesh_create_connection(client, &r_conn); + if (err) { + printf("Failed to create a recv mesh connection: %s (%d)\n", mesh_err2str(err), err); + pthread_exit(NULL); + } + + err = init_conn(r_conn, &config, MESH_CONN_KIND_RECEIVER, thread_id); + if (err) { + printf("ERROR: init_conn failed \n"); + mesh_delete_connection(&r_conn); + pthread_exit(NULL); + } + usleep(100000); + + err = mesh_create_connection(client, &s_conn); + if (err) { + printf("Failed to create a send mesh connection: %s (%d)\n", mesh_err2str(err), err); + pthread_exit(NULL); + } + + err = init_conn(s_conn, &config, MESH_CONN_KIND_SENDER, thread_id); + if (err) { + printf("ERROR: init_conn failed \n"); + mesh_delete_connection(&s_conn); + pthread_exit(NULL); + } + + // Pin the thread to specific CPU core + CPU_ZERO(&cpuset); + CPU_SET((thread_id + config.threads_num * 2) % CPU_CORES, &cpuset); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { + perror("pthread_setaffinity_np"); + pthread_exit(NULL); + } + + while (1) { + err = mesh_get_buffer_timeout(r_conn, &r_buf, timeout_ms); + if (err == -MESH_ERR_CONN_CLOSED) { + printf("Connection closed\n"); + break; + } + if (err) { + printf("Failed to get buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + timeout_ms = 1000000.0 / config.vid_fps + 1000; + + err = mesh_get_buffer(s_conn, &s_buf); + if (err) { + printf("Failed to get send buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + memcpy(s_buf->data, r_buf->data, sizeof(struct timespec) + sizeof(int)); + + err = mesh_put_buffer(&s_buf); + if (err) { + printf("Failed to put send buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + err = mesh_put_buffer(&r_buf); + if (err) { + printf("Failed to put recv buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + } + + sleep(1); + + /* Clean up */ + err = mesh_delete_connection(&s_conn); + if (err) + printf("Failed to delete send connection: %s (%d)\n", mesh_err2str(err), err); + err = mesh_delete_connection(&r_conn); + if (err) + printf("Failed to delete recv connection: %s (%d)\n", mesh_err2str(err), err); + + pthread_exit(NULL); +} + +int main(int argc, char **argv) +{ + int err; + int help_flag = 0; + int opt; + struct option longopts[] = { + { "help", no_argument, &help_flag, 'H' }, + { "width", required_argument, NULL, 'w' }, + { "height", required_argument, NULL, 'h' }, + { "fps", required_argument, NULL, 'f' }, + { "rcv_ip", required_argument, NULL, 'r' }, + { "rcv_port", required_argument, NULL, 'i' }, + { "send_ip", required_argument, NULL, 's' }, + { "send_port", required_argument, NULL, 'p' }, + { "protocol", required_argument, NULL, 'o' }, + { "number", required_argument, NULL, 'n' }, + { "file", required_argument, NULL, 'b' }, + { "type", required_argument, NULL, 't' }, + { "socketpath", required_argument, NULL, 'k' }, + { "interfaceid", required_argument, NULL, 'd' }, + { "loop", required_argument, NULL, 'l' }, + { "pix_fmt", required_argument, NULL, 'x' }, + { "threads_num", required_argument, NULL, 'm' }, + { 0 } + }; + + /* infinite loop, to be broken when we are done parsing options */ + while (1) { + opt = getopt_long(argc, argv, "Hw:h:f:s:p:o:n:r:i:t:k:d:l:x:b:m:", longopts, 0); + if (opt == -1) { + break; + } + + switch (opt) { + case 'H': + help_flag = 1; + break; + case 'w': + config.width = atoi(optarg); + break; + case 'h': + config.height = atoi(optarg); + break; + case 'f': + config.vid_fps = atof(optarg); + break; + case 'r': + strlcpy(config.recv_addr, optarg, sizeof(config.recv_addr)); + break; + case 'i': + strlcpy(config.recv_port, optarg, sizeof(config.recv_port)); + break; + case 's': + strlcpy(config.send_addr, optarg, sizeof(config.send_addr)); + break; + case 'p': + strlcpy(config.send_port, optarg, sizeof(config.send_port)); + break; + case 'o': + strlcpy(config.protocol_type, optarg, sizeof(config.protocol_type)); + break; + case 'n': + config.total_num = atoi(optarg); + break; + case 't': + strlcpy(config.payload_type, optarg, sizeof(config.payload_type)); + break; + case 'k': + strlcpy(config.socket_path, optarg, sizeof(config.socket_path)); + break; + case 'd': + config.interface_id = atoi(optarg); + break; + case 'l': + config.loop = (atoi(optarg) > 0); + break; + case 'x': + strlcpy(config.pix_fmt_string, optarg, sizeof(config.pix_fmt_string)); + break; + case 'm': + config.threads_num = atoi(optarg); + break; + case '?': + usage(stderr, argv[0]); + return 1; + default: + break; + } + } + + if (help_flag) { + usage(stdout, argv[0]); + return 0; + } + signal(SIGINT, intHandler); + + err = mesh_create_client(&client, NULL); + if (err) { + printf("Failed to create a mesh client: %s (%d)\n", mesh_err2str(err), err); + exit(-1); + } + + pthread_t *pong_threads = malloc(config.threads_num * sizeof(pthread_t *)); + int *threads_id = malloc(config.threads_num * sizeof(int *)); + + for (int i = 0; i < config.threads_num; i++) { + threads_id[i] = i; + if (pthread_create(&pong_threads[i], NULL, pong_thread, &threads_id[i]) != 0) { + perror("pthread_create"); + return 1; + } + usleep(100000); + } + + // Wait for all threads to finish + for (int i = 0; i < config.threads_num; i++) { + pthread_join(pong_threads[i], NULL); + } + + free(pong_threads); + free(threads_id); + + return 0; + + mesh_delete_client(&client); + + exit(-1); +} diff --git a/tests/single-node-sample-apps/test-rdma-latency.sh b/tests/single-node-sample-apps/test-rdma-latency.sh new file mode 100755 index 00000000..f0756ca6 --- /dev/null +++ b/tests/single-node-sample-apps/test-rdma-latency.sh @@ -0,0 +1,178 @@ +#!/bin/bash + +# SPDX-License-Identifier: BSD-3-Clause +# Copyright 2024 Intel Corporation + +# Directories +script_dir="$(readlink -f "$(dirname -- "${BASH_SOURCE[0]}")")" +bin_dir="$script_dir/../../_build/bin" +out_dir="$script_dir/out" + +# Media file names +output_file="$out_dir/out_rx.bin" + +# Video parameters +width="${1:-640}" +height="${2:-360}" +pixel_format="${3:-yuv422p10le}" +rdma_iface_ip="${4:-127.0.0.1}" +threads_num="${5:-1}" + +# Test configuration (cont'd) +wait_interval=$((25)) + +# Stdout/stderr forwarded output file names +tx_media_proxy_out="$out_dir/out_tx_media_proxy.txt" +rx_media_proxy_out="$out_dir/out_rx_media_proxy.txt" +sender_app_out="$out_dir/out_sender_app.txt" +recver_app_out="$out_dir/out_recver_app.txt" + +# Number of test repetitions +repeat_number=1 + +function message() { + local type="$1" + shift + local highlight_on="\e[38;05;33m" + local highlight_off="\e[m" + local timestamp="$(date +'%Y-%m-%d %H:%M:%S')" + echo -e "${highlight_on}$timestamp $type${highlight_off} $*" +} + +function info() { + local highlight_on='\e[38;05;45m'; + message "${highlight_on}[INFO]" "$*" +} + +function error() { + local highlight_on='\e[38;05;9m'; + message "${highlight_on}[ERRO]" "$*" +} + +function cleanup() { + rm -rf "$out_dir" &>/dev/null +} + +function run_in_background() { + # 1st argument is a command with arguments to be run in background + # 2nd argument is a file for stdout/stderr to be redirected to + # Returns PID of a spawned process + stdbuf -o0 -e0 $1 &>"$2" & +} + +function wait_text() { + # 1st argument is a timeout interval + # 2nd argument is a file to be monitored + # 3rd argument is a text to be looked for in the file + timeout $1 grep -q "$3" <(tail -n 100 -f "$2") + [ $? -eq 124 ] && error "timeout occurred" && return 1 + return 0 +} + +function wait_completion() { + # 1st argument is a timeout interval + info "Waiting for both sender and receiver to complete (interval ${1}s)" + local end_time=$((SECONDS + $1)) + + while kill -0 $sender_app_pid 2>/dev/null && kill -0 $recver_app_pid 2>/dev/null; do + if (( SECONDS >= end_time )); then + error "timeout occurred" + return 1 + fi + sleep 1 + done + + info "Transmission completed" + return 0 +} + +function shutdown_apps() { + info "Shutting down apps" + + kill "$sender_app_pid" &>/dev/null + kill "$recver_app_pid" &>/dev/null + + kill "$tx_media_proxy_pid" &>/dev/null + kill "$rx_media_proxy_pid" &>/dev/null +} + +function run_test_rdma() { + # Notice: recver_app should be started before sender_app. Currently, this is + # the only successful scenario. + + info "Connection type: rdma" + + info "Starting Tx side media_proxy" + local tx_media_proxy_cmd="media_proxy -t 8006" + run_in_background "$bin_dir/$tx_media_proxy_cmd" "$tx_media_proxy_out" + tx_media_proxy_pid="$!" + + sleep 1 + + info "Starting Rx side media_proxy" + local rx_media_proxy_cmd="media_proxy -t 8007" + run_in_background "$bin_dir/$rx_media_proxy_cmd" "$rx_media_proxy_out" + rx_media_proxy_pid="$!" + + sleep 1 + + info "Starting recver_app" + export MCM_MEDIA_PROXY_PORT=8006 + local recver_app_cmd="pong_app -s $rdma_iface_ip -t rdma -w $width -h $height -x $pixel_format -o auto -m $threads_num" + run_in_background "$bin_dir/$recver_app_cmd" "$recver_app_out" + recver_app_pid="$!" + + sleep 1 + + + info "Starting sender_app" + export MCM_MEDIA_PROXY_PORT=8007 + local sender_app_cmd="ping_app -s $rdma_iface_ip -t rdma -w $width -h $height -x $pixel_format -o auto -m $threads_num" + run_in_background "$bin_dir/$sender_app_cmd" "$sender_app_out" + sender_app_pid="$!" + + + info "Waiting for recver_app to connect to Rx media_proxy" + wait_text 10 $recver_app_out "Success connect to MCM media-proxy" + local recver_app_timeout="$?" + [ $recver_app_timeout -eq 0 ] && info "Connection established" + + info "Waiting for sender_app to connect to Tx media_proxy" + wait_text 10 $sender_app_out "Success connect to MCM media-proxy" + local sender_app_timeout="$?" + [ $sender_app_timeout -eq 0 ] && info "Connection established" + + wait_completion "$wait_interval" + local timeout="$?" + + sleep 1 + + shutdown_apps + + info "Cleanup" + # cleanup + + return $(($timeout || $recver_app_timeout || $sender_app_timeout)) +} + +info "Test MCM Tx/Rx for Single Node" +info " Binary directory: $(realpath $bin_dir)" +info " Output directory: $(realpath $out_dir)" +info " Frame size: $width x $height" +info " Pixel format: $pixel_format" + + +info "Initial cleanup" +cleanup + +for i in $(seq 1 $repeat_number); +do + info "--- Run #$i ---" + + mkdir -p "$out_dir" + run_test_rdma + + [ $? -ne 0 ] && error "Test failed" && exit 1 +done + +info "Test completed successfully" From a77f179269695f4afc2f71df63bd07046ecc1301 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Wed, 20 Nov 2024 15:46:58 +0000 Subject: [PATCH 3/4] Change pthead_exit() to return in pingpong apps Signed-off-by: Kasiewicz, Marek --- sdk/samples/ping_app.c | 20 ++++++++++---------- sdk/samples/pong_app.c | 14 +++++++------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sdk/samples/ping_app.c b/sdk/samples/ping_app.c index dee4d8bd..4e854e7c 100644 --- a/sdk/samples/ping_app.c +++ b/sdk/samples/ping_app.c @@ -107,7 +107,7 @@ typedef struct { double *latency_results; } t_data; -void *receiver_thread(void *arg) +void * receiver_thread(void *arg) { t_data *thread_data = (t_data *)arg; struct timespec send_time = {}, now = {}; @@ -122,14 +122,14 @@ void *receiver_thread(void *arg) err = mesh_create_connection(client, &conn); if (err) { printf("Failed to create a mesh connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } err = init_conn(conn, &config, MESH_CONN_KIND_RECEIVER, thread_data->thread_id); if (err) { printf("ERROR: init_conn failed \n"); mesh_delete_connection(&conn); - pthread_exit(NULL); + return NULL; } if (thread_data->thread_id == 0) @@ -140,7 +140,7 @@ void *receiver_thread(void *arg) CPU_SET((thread_data->thread_id + config.threads_num) % CPU_CORES, &cpuset); if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { perror("pthread_setaffinity_np"); - pthread_exit(NULL); + return NULL; } while (1) { @@ -174,10 +174,10 @@ void *receiver_thread(void *arg) if (err) printf("Failed to delete connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } -void *sender_thread(void *arg) +void * sender_thread(void *arg) { int thread_id = *(int *)arg; struct timespec send_time = {}; @@ -190,14 +190,14 @@ void *sender_thread(void *arg) err = mesh_create_connection(client, &conn); if (err) { printf("Failed to create a mesh connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } err = init_conn(conn, &config, MESH_CONN_KIND_SENDER, thread_id); if (err) { printf("ERROR: init_conn failed \n"); mesh_delete_connection(&conn); - pthread_exit(NULL); + return NULL; } // Pin the thread to specific CPU core @@ -205,7 +205,7 @@ void *sender_thread(void *arg) CPU_SET(thread_id % CPU_CORES, &cpuset); if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { perror("pthread_setaffinity_np"); - pthread_exit(NULL); + return NULL; } // Wait and send buffers based on atomic counter value @@ -234,7 +234,7 @@ void *sender_thread(void *arg) if (err) printf("Failed to delete connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } int main(int argc, char **argv) diff --git a/sdk/samples/pong_app.c b/sdk/samples/pong_app.c index 72d035c1..a085b52f 100644 --- a/sdk/samples/pong_app.c +++ b/sdk/samples/pong_app.c @@ -96,7 +96,7 @@ void usage(FILE *fp, const char *path) fprintf(fp, "\n"); } -void *pong_thread(void *arg) +void * pong_thread(void *arg) { int thread_id = *(int *)arg; cpu_set_t cpuset; @@ -111,28 +111,28 @@ void *pong_thread(void *arg) err = mesh_create_connection(client, &r_conn); if (err) { printf("Failed to create a recv mesh connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } err = init_conn(r_conn, &config, MESH_CONN_KIND_RECEIVER, thread_id); if (err) { printf("ERROR: init_conn failed \n"); mesh_delete_connection(&r_conn); - pthread_exit(NULL); + return NULL; } usleep(100000); err = mesh_create_connection(client, &s_conn); if (err) { printf("Failed to create a send mesh connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } err = init_conn(s_conn, &config, MESH_CONN_KIND_SENDER, thread_id); if (err) { printf("ERROR: init_conn failed \n"); mesh_delete_connection(&s_conn); - pthread_exit(NULL); + return NULL; } // Pin the thread to specific CPU core @@ -140,7 +140,7 @@ void *pong_thread(void *arg) CPU_SET((thread_id + config.threads_num * 2) % CPU_CORES, &cpuset); if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) { perror("pthread_setaffinity_np"); - pthread_exit(NULL); + return NULL; } while (1) { @@ -186,7 +186,7 @@ void *pong_thread(void *arg) if (err) printf("Failed to delete recv connection: %s (%d)\n", mesh_err2str(err), err); - pthread_exit(NULL); + return NULL; } int main(int argc, char **argv) From 5c9a1cd04cfe2a71c2cfa90d271baeb4053889e0 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Fri, 22 Nov 2024 08:23:56 +0000 Subject: [PATCH 4/4] Add sleep before deleting sender connection Signed-off-by: Kasiewicz, Marek --- sdk/samples/ping_app.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/samples/ping_app.c b/sdk/samples/ping_app.c index 4e854e7c..94f37042 100644 --- a/sdk/samples/ping_app.c +++ b/sdk/samples/ping_app.c @@ -229,6 +229,8 @@ void * sender_thread(void *arg) } } + sleep(1); + /* Clean up */ err = mesh_delete_connection(&conn); if (err)