Skip to content

Commit

Permalink
src: Change shmem_internal_put_wait implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
philipmarshall21 committed Jun 11, 2024
1 parent 5aa2723 commit 9c459ea
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 46 deletions.
26 changes: 13 additions & 13 deletions src/collectives.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ shmem_internal_bcast_linear(void *target, const void *source, size_t len,
if (pe == shmem_internal_my_pe) continue;
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, len, pe, &completion, nic_idx);
}
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);

shmem_internal_fence(SHMEM_CTX_DEFAULT);

Expand Down Expand Up @@ -522,7 +522,7 @@ shmem_internal_bcast_tree(void *target, const void *source, size_t len,
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, send_buf, len, children[i],
&completion, nic_idx);
}
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);

shmem_internal_fence(SHMEM_CTX_DEFAULT);

Expand Down Expand Up @@ -591,7 +591,7 @@ shmem_internal_op_to_all_linear(void *target, const void *source, size_t count,
exist. */
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, count * type_size,
shmem_internal_my_pe, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);

/* let everyone know that it's safe to send to us */
Expand Down Expand Up @@ -619,7 +619,7 @@ shmem_internal_op_to_all_linear(void *target, const void *source, size_t count,
/* send data, ack, and wait for completion */
shmem_internal_atomicv(SHMEM_CTX_DEFAULT, target, source, count * type_size,
PE_start, op, datatype, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one),
Expand Down Expand Up @@ -798,7 +798,7 @@ shmem_internal_op_to_all_tree(void *target, const void *source, size_t count, si
exist. */
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, count * type_size,
shmem_internal_my_pe, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);

/* let everyone know that it's safe to send to us */
Expand Down Expand Up @@ -827,7 +827,7 @@ shmem_internal_op_to_all_tree(void *target, const void *source, size_t count, si
(num_children == 0) ? source : target,
count * type_size, parent,
op, datatype, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one),
Expand Down Expand Up @@ -904,7 +904,7 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, current_target, wrk_size, peer,
&completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync_extra_peer, &ps_data_ready, sizeof(long), peer, nic_idx);
Expand Down Expand Up @@ -934,7 +934,7 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, current_target,
wrk_size, peer, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, step_psync, &ps_data_ready,
sizeof(long), peer, nic_idx);
Expand All @@ -944,7 +944,7 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, current_target,
wrk_size, peer, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, step_psync, &ps_data_ready,
sizeof(long), peer, nic_idx);
Expand All @@ -962,7 +962,7 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, current_target, wrk_size,
peer, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync_extra_peer, &ps_data_ready,
sizeof(long), peer, nic_idx);
Expand Down Expand Up @@ -1084,7 +1084,7 @@ shmem_internal_fcollect_linear(void *target, const void *source, size_t len,
size_t offset = ((shmem_internal_my_pe - PE_start) / PE_stride) * len;
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, (char*) target + offset, source, len, PE_start,
&completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);

/* ensure ordering */
shmem_internal_fence(SHMEM_CTX_DEFAULT);
Expand Down Expand Up @@ -1136,7 +1136,7 @@ shmem_internal_fcollect_ring(void *target, const void *source, size_t len,
/* send data to me + 1 */
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, (char*) target + iter_offset, (char*) target + iter_offset,
len, next_proc, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

/* send completion for this round to next proc. Note that we
Expand Down Expand Up @@ -1199,7 +1199,7 @@ shmem_internal_fcollect_recdbl(void *target, const void *source, size_t len,
/* send data to peer */
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, (char*) target + curr_offset, (char*) target + curr_offset,
distance * len, real_peer, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

/* mark completion for this round */
Expand Down
14 changes: 7 additions & 7 deletions src/data_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
shmem_internal_put_nb(ctx, target, source, \
sizeof(TYPE) * nelems, pe, \
&completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion); \
}


Expand All @@ -372,7 +372,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \
shmem_internal_put_nb(ctx, target, source, (SIZE) * nelems,\
pe, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion); \
}


Expand Down Expand Up @@ -555,7 +555,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
target += tst; \
source += sst; \
} \
shmem_internal_put_wait(ctx, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion); \
}

#define SHMEM_DEF_IPUT_N(NAME,SIZE) \
Expand Down Expand Up @@ -614,7 +614,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
target = (uint8_t *) target + tst * (SIZE); \
source = (uint8_t *) source + sst * (SIZE); \
} \
shmem_internal_put_wait(ctx, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion); \
}

#define SHMEM_DEF_IGET(STYPE,TYPE) \
Expand Down Expand Up @@ -757,7 +757,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
shmem_internal_put_nb(ctx, target, source, \
sizeof(TYPE) * nelems, pe, \
&completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion); \
shmem_internal_fence(ctx); \
if (sig_op == SHMEM_SIGNAL_ADD) \
shmem_internal_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), \
Expand Down Expand Up @@ -791,7 +791,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \
shmem_internal_put_nb(ctx, target, source, (SIZE) * nelems, \
pe, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion, nic_idx); \
shmem_internal_put_wait(ctx, &completion); \
shmem_internal_fence(ctx); \
if (sig_op == SHMEM_SIGNAL_ADD) \
shmem_internal_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), \
Expand Down Expand Up @@ -1029,7 +1029,7 @@ void SHMEM_FUNCTION_ATTRIBUTES shmemx_putmem_ct(shmemx_ct_t ct, void *target, co
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_put_ct_nb(ct, target, source, nelems, pe, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
}


Expand Down
8 changes: 4 additions & 4 deletions src/shmem_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ shmem_internal_put_nb(shmem_ctx_t ctx, void *target, const void *source, size_t

static inline
void
shmem_internal_put_wait(shmem_ctx_t ctx, long *completion, size_t nic_idx)
shmem_internal_put_wait(shmem_ctx_t ctx, long *completion)
{
shmem_transport_put_wait((shmem_transport_ctx_t *)ctx, completion, nic_idx);
shmem_transport_put_wait((shmem_transport_ctx_t *)ctx, completion);
/* on-node is always blocking, so this is a no-op for them */
}

Expand All @@ -69,7 +69,7 @@ shmem_internal_put_scalar(shmem_ctx_t ctx, void *target, const void *source, siz
#else
long completion = 0;
shmem_transport_put_nb((shmem_transport_ctx_t *)ctx, target, source, len, pe, &completion, nic_idx);
shmem_internal_put_wait(ctx, &completion, nic_idx);
shmem_internal_put_wait(ctx, &completion);
#endif
}
}
Expand Down Expand Up @@ -381,7 +381,7 @@ void shmem_internal_copy_self(void *dest, const void *source, size_t nelems, siz
long completion = 0;
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, dest, source, nelems,
shmem_internal_my_pe, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
#else
memcpy(dest, source, nelems);
#endif
Expand Down
4 changes: 2 additions & 2 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -2121,8 +2121,8 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options,
ctxp->stx_idx = malloc(shmem_transport_ofi_num_nics * sizeof(int));
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
#ifndef USE_CTX_LOCK
shmem_internal_cntr_write(&ctxp->pending_put_cntr, 0);
shmem_internal_cntr_write(&ctxp->pending_get_cntr, 0);
shmem_internal_cntr_write(&ctxp->pending_put_cntr[idx], 0);
shmem_internal_cntr_write(&ctxp->pending_get_cntr[idx], 0);
#else
ctxp->pending_put_cntr[idx] = 0;
ctxp->pending_get_cntr[idx] = 0;
Expand Down
47 changes: 27 additions & 20 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,16 @@ void shmem_transport_probe(void)
# ifdef USE_THREAD_COMPLETION
if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) {
# endif
struct fi_cq_entry buf;
int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1);
if (ret == 1)
RAISE_WARN_STR("Unexpected event");
// struct fi_cq_entry buf;
// int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1);
// if (ret == 1)
// RAISE_WARN_STR("Unexpected event");
for (size_t i = 0; i < shmem_transport_ofi_num_nics; i++) {
struct fi_cq_entry buf;
int ret = fi_cq_read(shmem_transport_ctx_default.cq[i], &buf, 1);
if (ret == 1)
RAISE_WARN_STR("Unexpected event");
}
# ifdef USE_THREAD_COMPLETION
pthread_mutex_unlock(&shmem_transport_ofi_progress_lock);
}
Expand Down Expand Up @@ -427,7 +433,7 @@ shmem_transport_ofi_bounce_buffer_t * create_bounce_buffer(shmem_transport_ctx_t
}

static inline
void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx, size_t idx)
void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx)
{
SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);

Expand Down Expand Up @@ -456,12 +462,13 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx, size_t idx)
shmem_transport_ofi_put_poll_limit < 0) {
success = 0;
fail = 0;
cnt = 0;

//for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
success = fi_cntr_read(ctx->put_cntr[idx]); /* FIXED? */
fail = fi_cntr_readerr(ctx->put_cntr[idx]); /* FIXED? */
cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIXED? */
//}
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
success += fi_cntr_read(ctx->put_cntr[idx]); /* FIXED? */
fail += fi_cntr_readerr(ctx->put_cntr[idx]); /* FIXED? */
cnt += SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIXED? */
}
shmem_transport_probe();

if (success < cnt && fail == 0) {
Expand All @@ -476,7 +483,7 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx, size_t idx)
}
poll_count++;
}
//for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIXED? */
do {
cnt = cnt_new;
Expand All @@ -485,16 +492,16 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx, size_t idx)
OFI_CTX_CHECK_ERROR(ctx, ret);
} while (cnt < cnt_new);
shmem_internal_assert(cnt == cnt_new);
//}
}

SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}

static inline
int shmem_transport_quiet(shmem_transport_ctx_t* ctx)
{
shmem_transport_put_quiet(ctx);
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
shmem_transport_put_quiet(ctx, idx);
shmem_transport_get_wait(ctx, idx);
}

Expand All @@ -505,12 +512,12 @@ int shmem_transport_quiet(shmem_transport_ctx_t* ctx)
static inline
int shmem_transport_fence(shmem_transport_ctx_t* ctx)
{
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
#if WANT_TOTAL_DATA_ORDERING == 0
/* Communication is unordered; must wait for puts and buffered (injected)
* non-fetching atomics to be completed in order to ensure ordering. */
shmem_transport_put_quiet(ctx, idx);
/* Communication is unordered; must wait for puts and buffered (injected)
* non-fetching atomics to be completed in order to ensure ordering. */
shmem_transport_put_quiet(ctx);
#endif
for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) {
/* Complete fetching ops; needed to support nonblocking fetch-atomics */
shmem_transport_get_wait(ctx, idx);
}
Expand Down Expand Up @@ -699,7 +706,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co
uint8_t *src_buf = (uint8_t *) source;

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]);

const struct iovec msg_iov = {
.iov_base = src_buf,
Expand Down Expand Up @@ -828,11 +835,11 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co

/* compatibility with Portals transport */
static inline
void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion, size_t nic_idx) {
void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) {

shmem_internal_assert((*completion) >= 0);
if((*completion) > 0) {
shmem_transport_put_quiet(ctx, nic_idx);
shmem_transport_put_quiet(ctx);
(*completion)--;
}
}
Expand Down

0 comments on commit 9c459ea

Please sign in to comment.