From 71d1f6e0fb8b85eeec7f25598603e0f0b6022112 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Mon, 27 Nov 2023 22:22:17 -0600 Subject: [PATCH] Modify the releasing Zap endpoints from IO thread process The patch separates the process of reducing the number of Zap endpoints associated with an IO thread from the process of releasing the endpoint from the IO thread. Additionally, it defers the nullification of `zap->thread` until the endpoint reference reaches zero. `zap_sock` and `zap_ugni` need to remove their events from epoll before delivering the disconnected event to applications, preventing the reception of additional epoll events. However, without this patch, this results in resetting `zap->thread` to NULL. Thus, the applications cannot retrive Zap thread information in the disconnected path. The patch addresses this limitation. --- lib/src/zap/fabric/zap_fabric.c | 2 ++ lib/src/zap/rdma/zap_rdma.c | 4 +++- lib/src/zap/sock/zap_sock.c | 2 ++ lib/src/zap/ugni/zap_ugni.c | 3 +++ lib/src/zap/zap.c | 10 ++++++++-- lib/src/zap/zap_priv.h | 25 +++++++++++++++++++++++++ 6 files changed, 43 insertions(+), 3 deletions(-) diff --git a/lib/src/zap/fabric/zap_fabric.c b/lib/src/zap/fabric/zap_fabric.c index 81347cdea..7552d8ed3 100644 --- a/lib/src/zap/fabric/zap_fabric.c +++ b/lib/src/zap/fabric/zap_fabric.c @@ -456,6 +456,8 @@ static void z_fi_destroy(zap_ep_t zep) DLOG("rep %p has %d ctxts\n", rep, rep->num_ctxts); + zap_io_thread_ep_remove(zep); + /* Do this first. */ while (!LIST_EMPTY(&rep->ep.map_list)) { map = (zap_map_t)LIST_FIRST(&rep->ep.map_list); diff --git a/lib/src/zap/rdma/zap_rdma.c b/lib/src/zap/rdma/zap_rdma.c index e27a3f8de..06a86bb2d 100644 --- a/lib/src/zap/rdma/zap_rdma.c +++ b/lib/src/zap/rdma/zap_rdma.c @@ -499,8 +499,10 @@ static void __rdma_teardown_conn(struct z_rdma_ep *ep) static void z_rdma_destroy(zap_ep_t zep) { struct z_rdma_ep *rep = (void*)zep; - if (zep->thread) + if (zep->thread) { zap_io_thread_ep_release(zep); + zap_io_thread_ep_remove(zep); + } pthread_mutex_lock(&rep->ep.lock); __rdma_teardown_conn(rep); pthread_mutex_unlock(&rep->ep.lock); diff --git a/lib/src/zap/sock/zap_sock.c b/lib/src/zap/sock/zap_sock.c index a22be58cc..008a5eca9 100644 --- a/lib/src/zap/sock/zap_sock.c +++ b/lib/src/zap/sock/zap_sock.c @@ -1992,6 +1992,8 @@ static void z_sock_destroy(zap_ep_t ep) DEBUG_LOG(sep, "%ld z_sock_destroy(%p)\n", GETTID(), sep); + zap_io_thread_ep_remove(ep); + while (!TAILQ_EMPTY(&sep->sq)) { wr = TAILQ_FIRST(&sep->sq); TAILQ_REMOVE(&sep->sq, wr, link); diff --git a/lib/src/zap/ugni/zap_ugni.c b/lib/src/zap/ugni/zap_ugni.c index 4fd759be9..6e0b7419d 100644 --- a/lib/src/zap/ugni/zap_ugni.c +++ b/lib/src/zap/ugni/zap_ugni.c @@ -2235,6 +2235,9 @@ static void z_ugni_destroy(zap_ep_t ep) { struct z_ugni_ep *uep = (void*)ep; CONN_LOG("destroying endpoint %p\n", uep); + + zap_io_thread_ep_remove(ep); + pthread_mutex_lock(&z_ugni_list_mutex); ZUGNI_LIST_REMOVE(uep, link); pthread_mutex_unlock(&z_ugni_list_mutex); diff --git a/lib/src/zap/zap.c b/lib/src/zap/zap.c index f2f3ea938..f169a3124 100644 --- a/lib/src/zap/zap.c +++ b/lib/src/zap/zap.c @@ -910,6 +910,14 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi) zap_err_t zap_io_thread_ep_release(zap_ep_t ep) { zap_err_t zerr; + zerr = ep->z->io_thread_ep_release(ep->thread, ep); + __atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST); + return zerr; +} + +zap_err_t zap_io_thread_ep_remove(zap_ep_t ep) +{ + zap_err_t zerr = 0; zap_io_thread_t t = ep->thread; pthread_mutex_lock(&t->mutex); @@ -917,8 +925,6 @@ zap_err_t zap_io_thread_ep_release(zap_ep_t ep) t->_n_ep--; t->stat->n_eps = t->_n_ep; pthread_mutex_unlock(&t->mutex); - zerr = ep->z->io_thread_ep_release(ep->thread, ep); - __atomic_fetch_sub(&ep->thread->stat->sq_sz, ep->sq_sz, __ATOMIC_SEQ_CST); ep->thread = NULL; return zerr; } diff --git a/lib/src/zap/zap_priv.h b/lib/src/zap/zap_priv.h index c699fd1a0..dce830d24 100644 --- a/lib/src/zap/zap_priv.h +++ b/lib/src/zap/zap_priv.h @@ -483,9 +483,34 @@ zap_err_t zap_io_thread_ep_assign(zap_ep_t ep, int tpi); * The transport shall call this function to release an endpoint from the * associated io thread. \c zap.io_thread_ep_release() will also be called as a * subsequence. + * + * Consequently, the endpoint will not be processed by the thread any further. + * However, the endpoint still keeps a reference to the thread for further statistics data access. + * The endpoint references to the thread is removed when \c zap_io_thread_ep_remove() is called. + * + * \param ep A Zap endpoint + * + * \return ZAP_ERR_OK on success. Otherwise, a Zap error code is returned. + * + * \see zap_io_thread_ep_remove */ zap_err_t zap_io_thread_ep_release(zap_ep_t ep); +/** + * Remove \c ep reference to the zap io thread. + * + * The function nullifies the endpoint reference to the thread and decrements + * the number of endpoints associated to the thread. This results in reducing + * the thread's load counter. + * + * \param ep A Zap endpoint + * + * \return ZAP_ERR_OK on success. Otherwise, a Zap error code is returned. + * + * \see zap_io_thread_ep_release + */ +zap_err_t zap_io_thread_ep_remove(zap_ep_t ep); + /* * The zap_thrstat structure maintains state for * the Zap thread utilization tracking functions.