From 0a55a9a12c94f081bef74a20765cbc33926d6238 Mon Sep 17 00:00:00 2001 From: yexiang-aws Date: Thu, 23 Jan 2025 08:17:11 +0000 Subject: [PATCH] Support ext-net v9 API from the NCCL Plugin interface Update the Nvidia interface to support the v9 interface introduced in NCCL 2.24. The primary changes for NCCL are: 1. NIC fusion / vitual NIC support 2. API supports indication when completion is not needed (e.g. LL128) 3. DeviceProps supports max transfer size for p2p/coll operations --- include/nccl_ofi.h | 19 ++++++ include/nccl_ofi_api.h | 2 + src/nccl_ofi_api.c | 42 ++++++++++++- src/nccl_ofi_interface_nvidia.c | 80 ++++++++++++++++++++++++ tests/functional/nccl_connection.c | 2 +- tests/functional/nccl_message_transfer.c | 11 +++- tests/functional/ring.c | 12 +++- tests/functional/test-common.h | 7 ++- 8 files changed, 164 insertions(+), 11 deletions(-) diff --git a/include/nccl_ofi.h b/include/nccl_ofi.h index f385b9152..42268c814 100644 --- a/include/nccl_ofi.h +++ b/include/nccl_ofi.h @@ -81,6 +81,18 @@ extern "C" { /* Initial number of entries in the MR cache of a device */ #define NCCL_OFI_MR_CACHE_INIT_SIZE 128 +/* Maximum number of channels in NCCL */ +#define NCCL_OFI_MAX_CHANNELS (32ULL) + +#define NCCL_PROXY_MAX_SUBS NCCL_OFI_MAX_CHANNELS + +/* + * ext-net v9 API interfaces updated the sizes to int type. But sizes in the + * actual plugin implementations are using int type, thus the max should use + * INT32_MAX + */ +#define NCCL_OFI_MAX_NET_SIZE INT32_MAX + /* Indicates if GPUDirect is supported by libfabric provider */ enum gdr_support_level_t {GDR_UNKNOWN, GDR_SUPPORTED, GDR_UNSUPPORTED}; extern enum gdr_support_level_t support_gdr; @@ -730,6 +742,13 @@ int get_inject_rma_size_opt(struct fid_ep *ofi_ep, */ long nccl_net_ofi_gettid(void); +static inline ncclResult_t is_size_valid(const size_t size) { + if (size > NCCL_OFI_MAX_NET_SIZE) { + return ncclInternalError; + } + return ncclSuccess; +} + #ifdef __cplusplus } // End extern "C" #endif diff --git a/include/nccl_ofi_api.h b/include/nccl_ofi_api.h index be462d893..86d9b8070 100644 --- a/include/nccl_ofi_api.h +++ b/include/nccl_ofi_api.h @@ -33,8 +33,10 @@ ncclResult_t nccl_net_ofi_regMrDmaBuf(void* comm, void* data, size_t size, int t ncclResult_t nccl_net_ofi_deregMr(void *comm, void *mhandle); ncclResult_t nccl_net_ofi_isend(void *sendComm, void* data, int size, int tag, void *mhandle, void** request); ncclResult_t nccl_net_ofi_isend_v4(void* sendComm, void* data, int size, void* mhandle, void** request); +ncclResult_t nccl_net_ofi_isend_v9(void *sendComm, void* data, size_t size, int tag, void *mhandle, void** request); ncclResult_t nccl_net_ofi_irecv(void* recvComm, int n, void** buffers, int* sizes, int *tags, void** mhandles, void** request); ncclResult_t nccl_net_ofi_irecv_v4(void* recvComm, void* data, int size, void* mhandle, void** request); +ncclResult_t nccl_net_ofi_irecv_v9(void* recvComm, int n, void** buffers, size_t* sizes, int *tags, void** mhandles, void** request); ncclResult_t nccl_net_ofi_test(void *request, int *done, int *size); ncclResult_t nccl_net_ofi_iflush(void* recvComm, int n, void** buffers, int* sizes, void** mhandles, void** request); ncclResult_t nccl_net_ofi_flush_v3(void* recvComm, void* data, int size, void* mhandle); diff --git a/src/nccl_ofi_api.c b/src/nccl_ofi_api.c index 65e63339b..20a427232 100644 --- a/src/nccl_ofi_api.c +++ b/src/nccl_ofi_api.c @@ -253,7 +253,7 @@ ncclResult_t nccl_net_ofi_listen_v4(int dev, void* handle, void** listenComm) /* * @brief Non-blocking connect which returns sComm as NULL - * with an expectation that it will be called again until + * with an expectation that it will be called again until * sComm != NULL * * The callee obtains one endpoint handle via the device's get_ep() @@ -695,6 +695,27 @@ ncclResult_t nccl_net_ofi_isend_v4(void* sendComm, void* data, int size, return nccl_net_ofi_isend(sendComm, data, size, 0, mhandle, request); } +// static inline ncclResult_t is_size_valid(const size_t size) { +// if (size > NCCL_OFI_MAX_NET_SIZE) { +// return ncclInternalError; +// } +// return ncclSuccess; +// } + +ncclResult_t nccl_net_ofi_isend_v9(void* sendComm, void* data, size_t size, + int tag, void* mhandle, void** request) +{ + // Validate input size + ncclResult_t validation_result = is_size_valid(size); + if (validation_result != ncclSuccess) { + return validation_result; + } + + // Safe conversion from size_t to int after validation + int sizeInt = (int)size; + + return nccl_net_ofi_isend(sendComm, data, sizeInt, tag, mhandle, request); +} ncclResult_t nccl_net_ofi_irecv(void* rComm, int n, void** buffers, int* sizes, int *tags, void** mhandles, void** req) @@ -744,6 +765,25 @@ ncclResult_t nccl_net_ofi_irecv_v4(void* recvComm, void* data, int size, return nccl_net_ofi_irecv(recvComm, 1, &data, &size, &tag, &mhandle, request); } +ncclResult_t nccl_net_ofi_irecv_v9(void* recvComm, int n, void** data, + size_t* sizes, int* tags, void** mhandles, void** request) +{ + int sizesInt[NCCL_PROXY_MAX_SUBS]; + ncclResult_t validation_result = ncclSuccess; + //reset to NULL if optional receive completion is set + if (*request == (void *) NCCL_NET_OPTIONAL_RECV_COMPLETION) { + *request = NULL; + } + for (int i = 0; i < n; i++) { + validation_result = is_size_valid(sizes[i]); + if (validation_result != ncclSuccess) { + return validation_result; + } + sizesInt[i] = (int)sizes[i]; + } + + return nccl_net_ofi_irecv(recvComm, n, data, sizesInt, tags, mhandles, request); +} ncclResult_t nccl_net_ofi_test(void* req, int* done, int* size) { diff --git a/src/nccl_ofi_interface_nvidia.c b/src/nccl_ofi_interface_nvidia.c index 042f8ba5c..8b7e79252 100644 --- a/src/nccl_ofi_interface_nvidia.c +++ b/src/nccl_ofi_interface_nvidia.c @@ -7,6 +7,61 @@ #include "nccl_ofi.h" #include "nccl_ofi_api.h" +static ncclResult_t getProperties_v9(int dev_id, ncclNetProperties_v9_t* props) +{ + nccl_ofi_properties_t ofi_properties; + ncclResult_t ret = nccl_net_ofi_get_properties(dev_id, &ofi_properties); + if (ret != ncclSuccess) { + return ret; + } + + props->name = ofi_properties.name; + props->pciPath = ofi_properties.pci_path; + props->guid = ofi_properties.guid; + props->ptrSupport = NCCL_PTR_HOST; + if (ofi_properties.hmem_support) { + props->ptrSupport |= NCCL_PTR_CUDA; + } + if (ofi_properties.dmabuf_support) { + props->ptrSupport |= NCCL_PTR_DMABUF; + } + + /** + * When net-plugin returns regIsGlobal=1 to NCCL (As part of + * net-plugin getProperties() API), it signals to NCCL that + * registered MRs are global, in the sense that they can be + * used by all communicators. In addition, it also signals to + * NCCL that the net-plugin have a fast MR cache such that + * calling regMr() on same buffer (address and size), will + * quickly return a previously globally registered MR on same + * buffer. + * + * When user registers a buffer with NCCL by using + * ncclCommRegister() API, if net-plugin supports + * regIsGlobal=1, NCCL will register the buffer globally once + * (On each net device) with regMr() API. When the net + * proxy-thread starts to execute a communication task on a + * previously registered user buffer, it will call the + * net-plugin regMr() to quickly fetch the previously globally + * registered MR from the plugin managed MR cache. + */ + props->regIsGlobal = ofi_properties.regIsGlobal; + + props->speed = ofi_properties.port_speed; + props->port = ofi_properties.port_number; + props->latency = ofi_properties.latency; + props->maxComms = ofi_properties.max_communicators; + props->maxRecvs = ofi_properties.max_group_receives; + props->netDeviceType = NCCL_NET_DEVICE_HOST; + props->netDeviceVersion = NCCL_NET_DEVICE_INVALID_VERSION; + props->vProps.ndevs = 1; + props->vProps.devs[0] = dev_id; + props->maxP2pBytes = NCCL_OFI_MAX_NET_SIZE; + props->maxCollBytes = NCCL_OFI_MAX_NET_SIZE; + + return ncclSuccess; +} + static ncclResult_t getProperties_v8(int dev_id, ncclNetProperties_v8_t* props) { nccl_ofi_properties_t ofi_properties; @@ -319,6 +374,29 @@ NCCL_OFI_EXPORT_SYMBOL ncclNet_v8_t ncclNetPlugin_v8 = { .irecvConsumed = NULL, }; +NCCL_OFI_EXPORT_SYMBOL ncclNet_v9_t ncclNetPlugin_v9 = { + .name = "Libfabric", + .init = nccl_net_ofi_init, + .devices = nccl_net_ofi_devices, + .getProperties = getProperties_v9, + .listen = nccl_net_ofi_listen, + .connect = connect_v7, + .accept = accept_v7, + .regMr = nccl_net_ofi_regMr, + .regMrDmaBuf = nccl_net_ofi_regMrDmaBuf, + .deregMr = nccl_net_ofi_deregMr, + .isend = nccl_net_ofi_isend_v9, + .irecv = nccl_net_ofi_irecv_v9, + .iflush = nccl_net_ofi_iflush, + .test = nccl_net_ofi_test, + .closeSend = nccl_net_ofi_closeSend, + .closeRecv = nccl_net_ofi_closeRecv, + .closeListen = nccl_net_ofi_closeListen, + .getDeviceMr = NULL, + .irecvConsumed = NULL, + .makeVDevice = NULL, +}; + /* * Versions 1.11.0 and prior of the plugin set the name to @@ -339,6 +417,7 @@ __attribute__((constructor)) static void nvidia_plugin_name_fixup(void) ncclNetPlugin_v6.name = "AWS Libfabric"; ncclNetPlugin_v7.name = "AWS Libfabric"; ncclNetPlugin_v8.name = "AWS Libfabric"; + ncclNetPlugin_v9.name = "AWS Libfabric"; } else if (net_env != NULL && 0 == strcasecmp(net_env, "OFI")) { ncclNetPlugin_v2.name = "OFI"; ncclNetPlugin_v3.name = "OFI"; @@ -347,5 +426,6 @@ __attribute__((constructor)) static void nvidia_plugin_name_fixup(void) ncclNetPlugin_v6.name = "OFI"; ncclNetPlugin_v7.name = "OFI"; ncclNetPlugin_v8.name = "OFI"; + ncclNetPlugin_v9.name = "OFI"; } } diff --git a/tests/functional/nccl_connection.c b/tests/functional/nccl_connection.c index 3874ec975..28a5dac18 100644 --- a/tests/functional/nccl_connection.c +++ b/tests/functional/nccl_connection.c @@ -21,7 +21,7 @@ int main(int argc, char* argv[]) nccl_net_ofi_send_comm_t *sComm = NULL; nccl_net_ofi_listen_comm_t *lComm = NULL; nccl_net_ofi_recv_comm_t *rComm = NULL; - ncclNetDeviceHandle_v8_t *s_ignore, *r_ignore; + test_nccl_net_device_handle_t *s_ignore, *r_ignore; char src_handle[NCCL_NET_HANDLE_MAXSIZE] = {}; char handle[NCCL_NET_HANDLE_MAXSIZE] = {}; test_nccl_net_t *extNet = NULL; diff --git a/tests/functional/nccl_message_transfer.c b/tests/functional/nccl_message_transfer.c index 3b2421ba0..3cea97bac 100644 --- a/tests/functional/nccl_message_transfer.c +++ b/tests/functional/nccl_message_transfer.c @@ -26,7 +26,7 @@ int main(int argc, char* argv[]) nccl_net_ofi_listen_comm_t *lComm = NULL; nccl_net_ofi_recv_comm_t *rComm = NULL; test_nccl_net_t *extNet = NULL; - ncclNetDeviceHandle_v8_t *s_ignore, *r_ignore; + test_nccl_net_device_handle_t *s_ignore, *r_ignore; char src_handle[NCCL_NET_HANDLE_MAXSIZE] = {}; ofi_log_function = logger; @@ -62,7 +62,8 @@ int main(int argc, char* argv[]) /* For grouped recvs */ int tag = 1; int nrecv = NCCL_OFI_MAX_RECVS; - int *sizes = (int *)malloc(sizeof(int)*nrecv); + size_t *sizes = (size_t *)malloc(sizeof(size_t) * nrecv); + int sizesInt[nrecv]; int *tags = (int *)malloc(sizeof(int)*nrecv); if (sizes == NULL || tags == NULL) { NCCL_OFI_WARN("Failed to allocate memory"); @@ -233,6 +234,10 @@ int main(int argc, char* argv[]) for (int recv_n = 0; recv_n < nrecv; recv_n++) { sizes[recv_n] = recv_sizes[szidx]; tags[recv_n] = tag; + + OFINCCLCHECKGOTO(is_size_valid(sizes[recv_n]), res, + exit); + sizesInt[recv_n] = sizes[recv_n]; } /* Allocate and populate expected buffer */ @@ -317,7 +322,7 @@ int main(int argc, char* argv[]) nccl_net_ofi_req_t *iflush_req = NULL; OFINCCLCHECKGOTO( extNet->iflush((void *)rComm, nrecv, - (void **)&recv_buf[idx], sizes, + (void **)&recv_buf[idx], sizesInt, &mhandle[idx], (void **)&iflush_req), res, exit); done = 0; diff --git a/tests/functional/ring.c b/tests/functional/ring.c index 77aac75a6..aff9c281a 100644 --- a/tests/functional/ring.c +++ b/tests/functional/ring.c @@ -23,7 +23,8 @@ int main(int argc, char *argv[]) char handle[NCCL_NET_HANDLE_MAXSIZE] = {}; char src_handle_prev[NCCL_NET_HANDLE_MAXSIZE] = {}; char src_handle_next[NCCL_NET_HANDLE_MAXSIZE] = {}; - ncclNetDeviceHandle_v8_t *s_ignore, *r_ignore; + + test_nccl_net_device_handle_t *s_ignore, *r_ignore; test_nccl_net_t *extNet = NULL; ofi_log_function = logger; @@ -50,7 +51,8 @@ int main(int argc, char *argv[]) /* For grouped receives */ int tag = 1; int nrecv = NCCL_OFI_MAX_RECVS; - int *sizes = (int *)malloc(sizeof(int)*nrecv); + size_t *sizes = (size_t *)malloc(sizeof(size_t) * nrecv); + int sizesInt[nrecv]; int *tags = (int *)malloc(sizeof(int)*nrecv); if (sizes == NULL || tags == NULL) { NCCL_OFI_WARN("Failed to allocate memory"); @@ -61,6 +63,10 @@ int main(int argc, char *argv[]) for (int recv_n = 0; recv_n < nrecv; recv_n++) { sizes[recv_n] = RECV_SIZE; tags[recv_n] = tag; + + OFINCCLCHECKGOTO(is_size_valid(sizes[recv_n]), res, + exit); + sizesInt[recv_n] = sizes[recv_n]; } /* Start up MPI */ @@ -259,7 +265,7 @@ int main(int argc, char *argv[]) idx); nccl_net_ofi_req_t *iflush_req = NULL; OFINCCLCHECKGOTO(extNet->iflush((void *)rComm, nrecv, - (void **)&recv_buf[idx], sizes, + (void **)&recv_buf[idx], sizesInt, &recv_mhandle[idx], (void **)&iflush_req), res, exit); done = 0; if (iflush_req) { diff --git a/tests/functional/test-common.h b/tests/functional/test-common.h index e83017423..117bf9543 100644 --- a/tests/functional/test-common.h +++ b/tests/functional/test-common.h @@ -57,10 +57,11 @@ } while(false); // Can be changed when porting new versions to the plugin -#define NCCL_PLUGIN_SYMBOL ncclNetPlugin_v8 +#define NCCL_PLUGIN_SYMBOL ncclNetPlugin_v9 -typedef ncclNet_v8_t test_nccl_net_t; -typedef ncclNetProperties_v8_t test_nccl_properties_t; +typedef ncclNet_v9_t test_nccl_net_t; +typedef ncclNetProperties_v9_t test_nccl_properties_t; +typedef ncclNetDeviceHandle_v9_t test_nccl_net_device_handle_t; static void logger(ncclDebugLogLevel level, unsigned long flags, const char *filefunc, int line, const char *fmt, ...)