Skip to content

Commit

Permalink
Support ext-net v9 API from the NCCL Plugin interface
Browse files Browse the repository at this point in the history
Update the Nvidia interface to support the v9 interface introduced
in NCCL 2.24.  The primary changes for NCCL are:
1. NIC fusion / vitual NIC support
2. API supports indication when completion is not needed (e.g. LL128)
3. DeviceProps supports max transfer size for p2p/coll operations
  • Loading branch information
yexiang-aws committed Jan 23, 2025
1 parent 309cea4 commit 0a55a9a
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 11 deletions.
19 changes: 19 additions & 0 deletions include/nccl_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ extern "C" {
/* Initial number of entries in the MR cache of a device */
#define NCCL_OFI_MR_CACHE_INIT_SIZE 128

/* Maximum number of channels in NCCL */
#define NCCL_OFI_MAX_CHANNELS (32ULL)

#define NCCL_PROXY_MAX_SUBS NCCL_OFI_MAX_CHANNELS

/*
* ext-net v9 API interfaces updated the sizes to int type. But sizes in the
* actual plugin implementations are using int type, thus the max should use
* INT32_MAX
*/
#define NCCL_OFI_MAX_NET_SIZE INT32_MAX

/* Indicates if GPUDirect is supported by libfabric provider */
enum gdr_support_level_t {GDR_UNKNOWN, GDR_SUPPORTED, GDR_UNSUPPORTED};
extern enum gdr_support_level_t support_gdr;
Expand Down Expand Up @@ -730,6 +742,13 @@ int get_inject_rma_size_opt(struct fid_ep *ofi_ep,
*/
long nccl_net_ofi_gettid(void);

static inline ncclResult_t is_size_valid(const size_t size) {
if (size > NCCL_OFI_MAX_NET_SIZE) {
return ncclInternalError;
}
return ncclSuccess;
}

#ifdef __cplusplus
} // End extern "C"
#endif
Expand Down
2 changes: 2 additions & 0 deletions include/nccl_ofi_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ ncclResult_t nccl_net_ofi_regMrDmaBuf(void* comm, void* data, size_t size, int t
ncclResult_t nccl_net_ofi_deregMr(void *comm, void *mhandle);
ncclResult_t nccl_net_ofi_isend(void *sendComm, void* data, int size, int tag, void *mhandle, void** request);
ncclResult_t nccl_net_ofi_isend_v4(void* sendComm, void* data, int size, void* mhandle, void** request);
ncclResult_t nccl_net_ofi_isend_v9(void *sendComm, void* data, size_t size, int tag, void *mhandle, void** request);
ncclResult_t nccl_net_ofi_irecv(void* recvComm, int n, void** buffers, int* sizes, int *tags, void** mhandles, void** request);
ncclResult_t nccl_net_ofi_irecv_v4(void* recvComm, void* data, int size, void* mhandle, void** request);
ncclResult_t nccl_net_ofi_irecv_v9(void* recvComm, int n, void** buffers, size_t* sizes, int *tags, void** mhandles, void** request);
ncclResult_t nccl_net_ofi_test(void *request, int *done, int *size);
ncclResult_t nccl_net_ofi_iflush(void* recvComm, int n, void** buffers, int* sizes, void** mhandles, void** request);
ncclResult_t nccl_net_ofi_flush_v3(void* recvComm, void* data, int size, void* mhandle);
Expand Down
42 changes: 41 additions & 1 deletion src/nccl_ofi_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ ncclResult_t nccl_net_ofi_listen_v4(int dev, void* handle, void** listenComm)

/*
* @brief Non-blocking connect which returns sComm as NULL
* with an expectation that it will be called again until
* with an expectation that it will be called again until
* sComm != NULL
*
* The callee obtains one endpoint handle via the device's get_ep()
Expand Down Expand Up @@ -695,6 +695,27 @@ ncclResult_t nccl_net_ofi_isend_v4(void* sendComm, void* data, int size,
return nccl_net_ofi_isend(sendComm, data, size, 0, mhandle, request);
}

// static inline ncclResult_t is_size_valid(const size_t size) {
// if (size > NCCL_OFI_MAX_NET_SIZE) {
// return ncclInternalError;
// }
// return ncclSuccess;
// }

ncclResult_t nccl_net_ofi_isend_v9(void* sendComm, void* data, size_t size,
int tag, void* mhandle, void** request)
{
// Validate input size
ncclResult_t validation_result = is_size_valid(size);
if (validation_result != ncclSuccess) {
return validation_result;
}

// Safe conversion from size_t to int after validation
int sizeInt = (int)size;

return nccl_net_ofi_isend(sendComm, data, sizeInt, tag, mhandle, request);
}

ncclResult_t nccl_net_ofi_irecv(void* rComm, int n, void** buffers, int* sizes,
int *tags, void** mhandles, void** req)
Expand Down Expand Up @@ -744,6 +765,25 @@ ncclResult_t nccl_net_ofi_irecv_v4(void* recvComm, void* data, int size,
return nccl_net_ofi_irecv(recvComm, 1, &data, &size, &tag, &mhandle, request);
}

ncclResult_t nccl_net_ofi_irecv_v9(void* recvComm, int n, void** data,
size_t* sizes, int* tags, void** mhandles, void** request)
{
int sizesInt[NCCL_PROXY_MAX_SUBS];
ncclResult_t validation_result = ncclSuccess;
//reset to NULL if optional receive completion is set
if (*request == (void *) NCCL_NET_OPTIONAL_RECV_COMPLETION) {
*request = NULL;
}
for (int i = 0; i < n; i++) {
validation_result = is_size_valid(sizes[i]);
if (validation_result != ncclSuccess) {
return validation_result;
}
sizesInt[i] = (int)sizes[i];
}

return nccl_net_ofi_irecv(recvComm, n, data, sizesInt, tags, mhandles, request);
}

ncclResult_t nccl_net_ofi_test(void* req, int* done, int* size)
{
Expand Down
80 changes: 80 additions & 0 deletions src/nccl_ofi_interface_nvidia.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,61 @@
#include "nccl_ofi.h"
#include "nccl_ofi_api.h"

static ncclResult_t getProperties_v9(int dev_id, ncclNetProperties_v9_t* props)
{
nccl_ofi_properties_t ofi_properties;
ncclResult_t ret = nccl_net_ofi_get_properties(dev_id, &ofi_properties);
if (ret != ncclSuccess) {
return ret;
}

props->name = ofi_properties.name;
props->pciPath = ofi_properties.pci_path;
props->guid = ofi_properties.guid;
props->ptrSupport = NCCL_PTR_HOST;
if (ofi_properties.hmem_support) {
props->ptrSupport |= NCCL_PTR_CUDA;
}
if (ofi_properties.dmabuf_support) {
props->ptrSupport |= NCCL_PTR_DMABUF;
}

/**
* When net-plugin returns regIsGlobal=1 to NCCL (As part of
* net-plugin getProperties() API), it signals to NCCL that
* registered MRs are global, in the sense that they can be
* used by all communicators. In addition, it also signals to
* NCCL that the net-plugin have a fast MR cache such that
* calling regMr() on same buffer (address and size), will
* quickly return a previously globally registered MR on same
* buffer.
*
* When user registers a buffer with NCCL by using
* ncclCommRegister() API, if net-plugin supports
* regIsGlobal=1, NCCL will register the buffer globally once
* (On each net device) with regMr() API. When the net
* proxy-thread starts to execute a communication task on a
* previously registered user buffer, it will call the
* net-plugin regMr() to quickly fetch the previously globally
* registered MR from the plugin managed MR cache.
*/
props->regIsGlobal = ofi_properties.regIsGlobal;

props->speed = ofi_properties.port_speed;
props->port = ofi_properties.port_number;
props->latency = ofi_properties.latency;
props->maxComms = ofi_properties.max_communicators;
props->maxRecvs = ofi_properties.max_group_receives;
props->netDeviceType = NCCL_NET_DEVICE_HOST;
props->netDeviceVersion = NCCL_NET_DEVICE_INVALID_VERSION;
props->vProps.ndevs = 1;
props->vProps.devs[0] = dev_id;
props->maxP2pBytes = NCCL_OFI_MAX_NET_SIZE;
props->maxCollBytes = NCCL_OFI_MAX_NET_SIZE;

return ncclSuccess;
}

static ncclResult_t getProperties_v8(int dev_id, ncclNetProperties_v8_t* props)
{
nccl_ofi_properties_t ofi_properties;
Expand Down Expand Up @@ -319,6 +374,29 @@ NCCL_OFI_EXPORT_SYMBOL ncclNet_v8_t ncclNetPlugin_v8 = {
.irecvConsumed = NULL,
};

NCCL_OFI_EXPORT_SYMBOL ncclNet_v9_t ncclNetPlugin_v9 = {
.name = "Libfabric",
.init = nccl_net_ofi_init,
.devices = nccl_net_ofi_devices,
.getProperties = getProperties_v9,
.listen = nccl_net_ofi_listen,
.connect = connect_v7,
.accept = accept_v7,
.regMr = nccl_net_ofi_regMr,
.regMrDmaBuf = nccl_net_ofi_regMrDmaBuf,
.deregMr = nccl_net_ofi_deregMr,
.isend = nccl_net_ofi_isend_v9,
.irecv = nccl_net_ofi_irecv_v9,
.iflush = nccl_net_ofi_iflush,
.test = nccl_net_ofi_test,
.closeSend = nccl_net_ofi_closeSend,
.closeRecv = nccl_net_ofi_closeRecv,
.closeListen = nccl_net_ofi_closeListen,
.getDeviceMr = NULL,
.irecvConsumed = NULL,
.makeVDevice = NULL,
};


/*
* Versions 1.11.0 and prior of the plugin set the name to
Expand All @@ -339,6 +417,7 @@ __attribute__((constructor)) static void nvidia_plugin_name_fixup(void)
ncclNetPlugin_v6.name = "AWS Libfabric";
ncclNetPlugin_v7.name = "AWS Libfabric";
ncclNetPlugin_v8.name = "AWS Libfabric";
ncclNetPlugin_v9.name = "AWS Libfabric";
} else if (net_env != NULL && 0 == strcasecmp(net_env, "OFI")) {
ncclNetPlugin_v2.name = "OFI";
ncclNetPlugin_v3.name = "OFI";
Expand All @@ -347,5 +426,6 @@ __attribute__((constructor)) static void nvidia_plugin_name_fixup(void)
ncclNetPlugin_v6.name = "OFI";
ncclNetPlugin_v7.name = "OFI";
ncclNetPlugin_v8.name = "OFI";
ncclNetPlugin_v9.name = "OFI";
}
}
2 changes: 1 addition & 1 deletion tests/functional/nccl_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int main(int argc, char* argv[])
nccl_net_ofi_send_comm_t *sComm = NULL;
nccl_net_ofi_listen_comm_t *lComm = NULL;
nccl_net_ofi_recv_comm_t *rComm = NULL;
ncclNetDeviceHandle_v8_t *s_ignore, *r_ignore;
test_nccl_net_device_handle_t *s_ignore, *r_ignore;
char src_handle[NCCL_NET_HANDLE_MAXSIZE] = {};
char handle[NCCL_NET_HANDLE_MAXSIZE] = {};
test_nccl_net_t *extNet = NULL;
Expand Down
11 changes: 8 additions & 3 deletions tests/functional/nccl_message_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ int main(int argc, char* argv[])
nccl_net_ofi_listen_comm_t *lComm = NULL;
nccl_net_ofi_recv_comm_t *rComm = NULL;
test_nccl_net_t *extNet = NULL;
ncclNetDeviceHandle_v8_t *s_ignore, *r_ignore;
test_nccl_net_device_handle_t *s_ignore, *r_ignore;
char src_handle[NCCL_NET_HANDLE_MAXSIZE] = {};

ofi_log_function = logger;
Expand Down Expand Up @@ -62,7 +62,8 @@ int main(int argc, char* argv[])
/* For grouped recvs */
int tag = 1;
int nrecv = NCCL_OFI_MAX_RECVS;
int *sizes = (int *)malloc(sizeof(int)*nrecv);
size_t *sizes = (size_t *)malloc(sizeof(size_t) * nrecv);
int sizesInt[nrecv];
int *tags = (int *)malloc(sizeof(int)*nrecv);
if (sizes == NULL || tags == NULL) {
NCCL_OFI_WARN("Failed to allocate memory");
Expand Down Expand Up @@ -233,6 +234,10 @@ int main(int argc, char* argv[])
for (int recv_n = 0; recv_n < nrecv; recv_n++) {
sizes[recv_n] = recv_sizes[szidx];
tags[recv_n] = tag;

OFINCCLCHECKGOTO(is_size_valid(sizes[recv_n]), res,
exit);
sizesInt[recv_n] = sizes[recv_n];
}

/* Allocate and populate expected buffer */
Expand Down Expand Up @@ -317,7 +322,7 @@ int main(int argc, char* argv[])
nccl_net_ofi_req_t *iflush_req = NULL;
OFINCCLCHECKGOTO(
extNet->iflush((void *)rComm, nrecv,
(void **)&recv_buf[idx], sizes,
(void **)&recv_buf[idx], sizesInt,
&mhandle[idx], (void **)&iflush_req),
res, exit);
done = 0;
Expand Down
12 changes: 9 additions & 3 deletions tests/functional/ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ int main(int argc, char *argv[])
char handle[NCCL_NET_HANDLE_MAXSIZE] = {};
char src_handle_prev[NCCL_NET_HANDLE_MAXSIZE] = {};
char src_handle_next[NCCL_NET_HANDLE_MAXSIZE] = {};
ncclNetDeviceHandle_v8_t *s_ignore, *r_ignore;

test_nccl_net_device_handle_t *s_ignore, *r_ignore;
test_nccl_net_t *extNet = NULL;

ofi_log_function = logger;
Expand All @@ -50,7 +51,8 @@ int main(int argc, char *argv[])
/* For grouped receives */
int tag = 1;
int nrecv = NCCL_OFI_MAX_RECVS;
int *sizes = (int *)malloc(sizeof(int)*nrecv);
size_t *sizes = (size_t *)malloc(sizeof(size_t) * nrecv);
int sizesInt[nrecv];
int *tags = (int *)malloc(sizeof(int)*nrecv);
if (sizes == NULL || tags == NULL) {
NCCL_OFI_WARN("Failed to allocate memory");
Expand All @@ -61,6 +63,10 @@ int main(int argc, char *argv[])
for (int recv_n = 0; recv_n < nrecv; recv_n++) {
sizes[recv_n] = RECV_SIZE;
tags[recv_n] = tag;

OFINCCLCHECKGOTO(is_size_valid(sizes[recv_n]), res,
exit);
sizesInt[recv_n] = sizes[recv_n];
}

/* Start up MPI */
Expand Down Expand Up @@ -259,7 +265,7 @@ int main(int argc, char *argv[])
idx);
nccl_net_ofi_req_t *iflush_req = NULL;
OFINCCLCHECKGOTO(extNet->iflush((void *)rComm, nrecv,
(void **)&recv_buf[idx], sizes,
(void **)&recv_buf[idx], sizesInt,
&recv_mhandle[idx], (void **)&iflush_req), res, exit);
done = 0;
if (iflush_req) {
Expand Down
7 changes: 4 additions & 3 deletions tests/functional/test-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@
} while(false);

// Can be changed when porting new versions to the plugin
#define NCCL_PLUGIN_SYMBOL ncclNetPlugin_v8
#define NCCL_PLUGIN_SYMBOL ncclNetPlugin_v9

typedef ncclNet_v8_t test_nccl_net_t;
typedef ncclNetProperties_v8_t test_nccl_properties_t;
typedef ncclNet_v9_t test_nccl_net_t;
typedef ncclNetProperties_v9_t test_nccl_properties_t;
typedef ncclNetDeviceHandle_v9_t test_nccl_net_device_handle_t;

static void logger(ncclDebugLogLevel level, unsigned long flags, const char *filefunc,
int line, const char *fmt, ...)
Expand Down

0 comments on commit 0a55a9a

Please sign in to comment.