diff --git a/lib/src/zap/fabric/zap_fabric.c b/lib/src/zap/fabric/zap_fabric.c index 81347cdea..7552d8ed3 100644 --- a/lib/src/zap/fabric/zap_fabric.c +++ b/lib/src/zap/fabric/zap_fabric.c @@ -456,6 +456,8 @@ static void z_fi_destroy(zap_ep_t zep) DLOG("rep %p has %d ctxts\n", rep, rep->num_ctxts); + zap_io_thread_ep_remove(zep); + /* Do this first. */ while (!LIST_EMPTY(&rep->ep.map_list)) { map = (zap_map_t)LIST_FIRST(&rep->ep.map_list); diff --git a/lib/src/zap/rdma/zap_rdma.c b/lib/src/zap/rdma/zap_rdma.c index e27a3f8de..06a86bb2d 100644 --- a/lib/src/zap/rdma/zap_rdma.c +++ b/lib/src/zap/rdma/zap_rdma.c @@ -499,8 +499,10 @@ static void __rdma_teardown_conn(struct z_rdma_ep *ep) static void z_rdma_destroy(zap_ep_t zep) { struct z_rdma_ep *rep = (void*)zep; - if (zep->thread) + if (zep->thread) { zap_io_thread_ep_release(zep); + zap_io_thread_ep_remove(zep); + } pthread_mutex_lock(&rep->ep.lock); __rdma_teardown_conn(rep); pthread_mutex_unlock(&rep->ep.lock); diff --git a/lib/src/zap/sock/zap_sock.c b/lib/src/zap/sock/zap_sock.c index a22be58cc..008a5eca9 100644 --- a/lib/src/zap/sock/zap_sock.c +++ b/lib/src/zap/sock/zap_sock.c @@ -1992,6 +1992,8 @@ static void z_sock_destroy(zap_ep_t ep) DEBUG_LOG(sep, "%ld z_sock_destroy(%p)\n", GETTID(), sep); + zap_io_thread_ep_remove(ep); + while (!TAILQ_EMPTY(&sep->sq)) { wr = TAILQ_FIRST(&sep->sq); TAILQ_REMOVE(&sep->sq, wr, link); diff --git a/lib/src/zap/ugni/zap_ugni.c b/lib/src/zap/ugni/zap_ugni.c index 4fd759be9..6e0b7419d 100644 --- a/lib/src/zap/ugni/zap_ugni.c +++ b/lib/src/zap/ugni/zap_ugni.c @@ -2235,6 +2235,9 @@ static void z_ugni_destroy(zap_ep_t ep) { struct z_ugni_ep *uep = (void*)ep; CONN_LOG("destroying endpoint %p\n", uep); + + zap_io_thread_ep_remove(ep); + pthread_mutex_lock(&z_ugni_list_mutex); ZUGNI_LIST_REMOVE(uep, link); pthread_mutex_unlock(&z_ugni_list_mutex); diff --git a/lib/src/zap/zap.c b/lib/src/zap/zap.c index f2f3ea938..f169a3124 100644 --- a/lib/src/zap/zap.c +++ b/lib/src/zap/zap.c @@ -910,6 +910,14 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi) zap_err_t zap_io_thread_ep_release(zap_ep_t ep) { zap_err_t zerr; + zerr = ep->z->io_thread_ep_release(ep->thread, ep); + __atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST); + return zerr; +} + +zap_err_t zap_io_thread_ep_remove(zap_ep_t ep) +{ + zap_err_t zerr = 0; zap_io_thread_t t = ep->thread; pthread_mutex_lock(&t->mutex); @@ -917,8 +925,6 @@ zap_err_t zap_io_thread_ep_release(zap_ep_t ep) t->_n_ep--; t->stat->n_eps = t->_n_ep; pthread_mutex_unlock(&t->mutex); - zerr = ep->z->io_thread_ep_release(ep->thread, ep); - __atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST); ep->thread = NULL; return zerr; } diff --git a/lib/src/zap/zap_priv.h b/lib/src/zap/zap_priv.h index c699fd1a0..dce830d24 100644 --- a/lib/src/zap/zap_priv.h +++ b/lib/src/zap/zap_priv.h @@ -483,9 +483,34 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi); * The transport shall call this function to release an endpoint from the * associated io thread. \c zap.io_thread_ep_release() will also be called as a * subsequence. + * + * Consequently, the endpoint will not be processed by the thread any further. + * However, the endpoint still keeps a reference to the thread for further statistics data access. + * The endpoint references to the thread is removed when \c zap_io_thread_ep_remove() is called. + * + * \param ep A Zap endpoint + * + * \return ZAP_ERR_OK on success. Otherwise, a Zap error code is returned. + * + * \see zap_io_thread_ep_remove */ zap_err_t zap_io_thread_ep_release(zap_ep_t ep); +/** + * Remove \c ep reference to the zap io thread. + * + * The function nullifies the endpoint reference to the thread and decrements + * the number of endpoints associated to the thread. This results in reducing + * the thread's load counter. + * + * \param ep A Zap endpoint + * + * \return ZAP_ERR_OK on success. Otherwise, a Zap error code is returned. + * + * \see zap_io_thread_ep_release + */ +zap_err_t zap_io_thread_ep_remove(zap_ep_t ep); + /* * The zap_thrstat structure maintains state for * the Zap thread utilization tracking functions.