Skip to content

Commit

Permalink
day 2 progress
Browse files Browse the repository at this point in the history
Signed-off-by: Seth Zegelstein <szegel@amazon.com>
  • Loading branch information
a-szegel committed Jan 24, 2024
1 parent 4729af9 commit d6ef023
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 67 deletions.
112 changes: 110 additions & 2 deletions include/ofi_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ struct util_ep {

struct ofi_bitmask *coll_cid_mask;
struct slist coll_ready_queue;
ofi_atomic32_t cq_ref;
};

int ofi_ep_bind_av(struct util_ep *util_ep, struct util_av *av);
Expand Down Expand Up @@ -498,13 +499,120 @@ OFI_DECLARE_CIRQUE(struct fi_cq_tagged_entry, util_comp_cirq);

typedef void (*ofi_cq_progress_func)(struct util_cq *cq);

struct ofi_ep_list {
struct util_ep **eps;
size_t num_eps;
};

static inline struct ofi_ep_list*
ofi_ep_list_create(size_t num_eps)
{
struct ofi_ep_list* list = malloc(sizeof(struct ofi_ep_list));
if (!list)
return list;

list->eps = malloc(num_eps * sizeof(struct util_ep*));
list->num_eps = num_eps;
return list;
}

static inline void
ofi_ep_list_copy(struct ofi_ep_list* src, struct ofi_ep_list* dst, size_t size)
{
assert(src->num_eps >= size);
assert(dst->num_eps >= size);

for (int i = 0; i < size; i++) {
dst->eps[i] = src->eps[i];
}
}

static inline void
ofi_ep_list_copy_all_but(struct ofi_ep_list* src, struct ofi_ep_list* dst, size_t src_size, struct util_ep *ep)
{
assert(src->num_eps >= src_size);
assert(dst->num_eps >= src_size);
assert(index < src_size);

for (int i = 0, j = 0; i < src_size; i++) {
if (src->eps[i] != ep) {
dst->eps[j] = src->eps[i];
j++;
}
}
}

static inline void
ofi_ep_list_destroy(struct ofi_ep_list** list)
{
(*list)->num_eps = 0;
free((*list)->eps);
free(*list);
*list = NULL;
}

static inline void
ofi_util_ep_free_resources(struct util_ep *ep)
{
return;
}

struct ofi_ep_list_to_delete {
struct dlist_entry entry;
struct ofi_ep_list *ep_list;
struct util_ep *ep_to_be_closed;
};

static inline int ep_list_to_delete_insert(struct dlist_entry *ep_list_to_delete, struct ofi_ep_list *ep_list, struct util_ep *ep_to_be_closed)
{
struct ofi_ep_list_to_delete *item;

item = calloc(1, sizeof(*item));
if (!item)
return -FI_ENOMEM;

item->ep_list = ep_list;
item->ep_to_be_closed = ep_to_be_closed;
dlist_insert_tail(&item->entry, ep_list_to_delete);
return FI_SUCCESS;
}

void ofi_endpoint_clean_resources(struct util_ep *util_ep);


static inline void* array_list_remove_item(struct util_ep *util_ep, struct util_cq *cq)
{
struct ofi_dyn_arr *tmp_ep_list;


if (cq->ep_list2->num_eps > 1) {
tmp_ep_list = ofi_ep_list_create(cq->ep_list2->num_eps - 1);
// Copy contents of old list to tmp_ep_list (without EP to remove)
ofi_ep_list_copy_all_but(cq->ep_list2, tmp_ep_list, cq->ep_list2->num_eps, util_ep);

// atomic swap tmp_ep_list and cq->ep_list

tmp_ep_list = cq->ep_list2;
} else {
tmp_ep_list = cq->ep_list2;
cq->ep_list2 = NULL;
}

return tmp_ep_list;
}

struct util_cq {
struct fid_cq cq_fid;
struct util_domain *domain;
struct util_wait *wait;
ofi_atomic32_t ref;
struct dlist_entry ep_list;
struct ofi_genlock ep_list_lock;


ofi_mutex_t cntrl_iface_lock;
struct ofi_ep_list *ep_list2;
struct dlist_entry ep_list_to_delete;


struct ofi_genlock cq_lock;
uint64_t flags;

Expand Down
40 changes: 20 additions & 20 deletions prov/rxd/src/rxd_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ static const char *rxd_cq_strerror(struct fid_cq *cq_fid, int prov_errno,

cq = container_of(cq_fid, struct rxd_cq, util_cq.cq_fid);

ofi_genlock_lock(&cq->util_cq.ep_list_lock);
assert(!dlist_empty(&cq->util_cq.ep_list));
fid_entry = container_of(cq->util_cq.ep_list.next,
struct fid_list_entry, entry);
util_ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
ep = container_of(util_ep, struct rxd_ep, util_ep);

str = fi_cq_strerror(ep->dg_cq, prov_errno, err_data, buf, len);
ofi_genlock_unlock(&cq->util_cq.ep_list_lock);
//ofi_genlock_lock(&cq->util_cq.ep_list_lock);
// assert(!dlist_empty(&cq->util_cq.ep_list));
// fid_entry = container_of(cq->util_cq.ep_list.next,
// struct fid_list_entry, entry);
// util_ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
// ep = container_of(util_ep, struct rxd_ep, util_ep);

// str = fi_cq_strerror(ep->dg_cq, prov_errno, err_data, buf, len);
//ofi_genlock_unlock(&cq->util_cq.ep_list_lock);
return str;
}

Expand Down Expand Up @@ -1257,17 +1257,17 @@ ssize_t rxd_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
}

ep_retry = -1;
ofi_genlock_lock(&cq->ep_list_lock);
dlist_foreach_container(&cq->ep_list, struct fid_list_entry,
fid_entry, entry) {
ep = container_of(fid_entry->fid, struct rxd_ep,
util_ep.ep_fid.fid);
if (ep->next_retry == -1)
continue;
ep_retry = ep_retry == -1 ? ep->next_retry :
MIN(ep_retry, ep->next_retry);
}
ofi_genlock_unlock(&cq->ep_list_lock);
//ofi_genlock_lock(&cq->ep_list_lock);
// dlist_foreach_container(&cq->ep_list, struct fid_list_entry,
// fid_entry, entry) {
// ep = container_of(fid_entry->fid, struct rxd_ep,
// util_ep.ep_fid.fid);
// if (ep->next_retry == -1)
// continue;
// ep_retry = ep_retry == -1 ? ep->next_retry :
// MIN(ep_retry, ep->next_retry);
// }
//ofi_genlock_unlock(&cq->ep_list_lock);

ret = fi_wait(&cq->wait->wait_fid, ep_retry == -1 ?
timeout : rxd_get_timeout(ep_retry));
Expand Down
2 changes: 1 addition & 1 deletion prov/rxm/src/rxm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rxm_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
struct fid_list_entry *fid_entry;

cq = container_of(cq_fid, struct util_cq, cq_fid);
fid_entry = container_of(cq->ep_list.next, struct fid_list_entry, entry);
// fid_entry = container_of(cq->ep_list.next, struct fid_list_entry, entry);
rxm_ep = container_of(fid_entry->fid, struct rxm_ep, util_ep.ep_fid);

return fi_cq_strerror(rxm_ep->msg_cq, prov_errno, err_data, buf, len);
Expand Down
6 changes: 3 additions & 3 deletions prov/shm/src/smr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -879,9 +879,9 @@ static int smr_ep_bind_cq(struct smr_ep *ep, struct util_cq *cq, uint64_t flags)
return ret;
}

ret = fid_list_insert2(&cq->ep_list,
&cq->ep_list_lock,
&ep->util_ep.ep_fid.fid);
// ret = fid_list_insert2(&cq->ep_list,
// &cq->ep_list_lock,
// &ep->util_ep.ep_fid.fid);

return ret;
}
Expand Down
4 changes: 2 additions & 2 deletions prov/sm2/src/sm2_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ static int sm2_ep_bind_cq(struct sm2_ep *ep, struct util_cq *cq, uint64_t flags)
return ret;
}

ret = fid_list_insert2(&cq->ep_list, &cq->ep_list_lock,
&ep->util_ep.ep_fid.fid);
// ret = fid_list_insert2(&cq->ep_list, &cq->ep_list_lock,
// &ep->util_ep.ep_fid.fid);

return ret;
}
Expand Down
12 changes: 6 additions & 6 deletions prov/udp/src/udpx_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,9 @@ static int udpx_ep_close(struct fid *fid)
struct util_wait_fd, util_wait);
ofi_epoll_del(wait->epoll_fd, (int)ep->sock);
}
fid_list_remove2(&ep->util_ep.rx_cq->ep_list,
&ep->util_ep.rx_cq->ep_list_lock,
&ep->util_ep.ep_fid.fid);
// fid_list_remove2(&ep->util_ep.rx_cq->ep_list,
// &ep->util_ep.rx_cq->ep_list_lock,
// &ep->util_ep.ep_fid.fid);
}

udpx_rx_cirq_free(ep->rxq);
Expand Down Expand Up @@ -614,9 +614,9 @@ static int udpx_ep_bind_cq(struct udpx_ep *ep, struct util_cq *cq,
udpx_rx_src_comp : udpx_rx_comp;
}

ret = fid_list_insert2(&cq->ep_list,
&cq->ep_list_lock,
&ep->util_ep.ep_fid.fid);
// ret = fid_list_insert2(&cq->ep_list,
// &cq->ep_list_lock,
// &ep->util_ep.ep_fid.fid);
if (ret)
return ret;
}
Expand Down
67 changes: 56 additions & 11 deletions prov/util/src/util_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ static void util_peer_cq_cleanup(struct util_cq *cq)

int ofi_cq_cleanup(struct util_cq *cq)
{
struct dlist_entry *itr;
struct ofi_ep_list_to_delete *list_entry;

dlist_foreach(&cq->ep_list_to_delete, itr) {
list_entry = container_of(itr, struct ofi_ep_list_to_delete, entry);
ofi_ep_list_destroy(&list_entry->ep_list);
ofi_endpoint_clean_resources(list_entry->ep_to_be_closed);
}

if (ofi_atomic_get32(&cq->ref))
return -FI_EBUSY;

Expand All @@ -449,7 +458,7 @@ int ofi_cq_cleanup(struct util_cq *cq)
}

ofi_genlock_destroy(&cq->cq_lock);
ofi_genlock_destroy(&cq->ep_list_lock);
ofi_mutex_destroy(&cq->cntrl_iface_lock);
ofi_atomic_dec32(&cq->domain->ref);
return 0;
}
Expand Down Expand Up @@ -516,17 +525,49 @@ int ofi_check_bind_cq_flags(struct util_ep *ep, struct util_cq *cq,
void ofi_cq_progress(struct util_cq *cq)
{
struct util_ep *ep;
struct fid_list_entry *fid_entry;
struct dlist_entry *item;
struct dlist_entry *itr;
struct ofi_ep_list *ep_list_to_itr;
struct ofi_ep_list_to_delete *list_entry;
bool need_lock = cq->domain->threading == FI_THREAD_DOMAIN ||
cq->domain->threading == FI_THREAD_COMPLETION ?
false : true;


/* Grab ep list now in case it changes in control plane */
// TODO Make this atomic
ep_list_to_itr = cq->ep_list2;

if (need_lock) {
ofi_mutex_lock(&cq->cntrl_iface_lock);
}

if (OFI_UNLIKELY(!dlist_empty(&cq->ep_list_to_delete))) {
if (need_lock == false) {
ofi_mutex_lock(&cq->cntrl_iface_lock);
}

ofi_genlock_lock(&cq->ep_list_lock);
dlist_foreach(&cq->ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
dlist_foreach(&cq->ep_list_to_delete, itr) {
list_entry = container_of(itr, struct ofi_ep_list_to_delete, entry);
ofi_ep_list_destroy(&list_entry->ep_list);
ofi_endpoint_clean_resources(list_entry->ep_to_be_closed);
}

dlist_init(&cq->ep_list_to_delete);


if (need_lock == false) {
ofi_mutex_unlock(&cq->cntrl_iface_lock);
}
}

for (int i = 0; i < ep_list_to_itr->num_eps; i++) {
ep = ep_list_to_itr->eps[i];
ep->progress(ep);
}

if (need_lock) {
ofi_mutex_unlock(&cq->cntrl_iface_lock);
}
ofi_genlock_unlock(&cq->ep_list_lock);
}

static ssize_t util_peer_cq_write(struct fid_peer_cq *cq, void *context,
Expand Down Expand Up @@ -721,10 +762,14 @@ int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
cq->progress = progress;
cq->err_data = NULL;

cq->ep_list2 = malloc(sizeof(struct ofi_ep_list));
cq->ep_list2->num_eps = 0;
cq->ep_list2->eps = NULL;

cq->domain = container_of(domain, struct util_domain, domain_fid);
ofi_atomic_initialize32(&cq->ref, 0);
ofi_atomic_initialize32(&cq->wakeup, 0);
dlist_init(&cq->ep_list);
dlist_init(&cq->ep_list_to_delete);

if (cq->domain->threading == FI_THREAD_COMPLETION ||
cq->domain->threading == FI_THREAD_DOMAIN)
Expand All @@ -737,7 +782,7 @@ int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
return ret;

/* TODO Figure out how to optimize this lock for rdm and msg endpoints */
ret = ofi_genlock_init(&cq->ep_list_lock, OFI_LOCK_MUTEX);
ret = ofi_mutex_init(&cq->cntrl_iface_lock);
if (ret)
goto destroy1;

Expand Down Expand Up @@ -798,7 +843,7 @@ int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
cleanup:
util_peer_cq_cleanup(cq);
destroy2:
ofi_genlock_destroy(&cq->ep_list_lock);
ofi_mutex_destroy(&cq->cntrl_iface_lock);
destroy1:
ofi_genlock_destroy(&cq->cq_lock);
return ret;
Expand Down
Loading

0 comments on commit d6ef023

Please sign in to comment.