From 53145b5e6557f3827ecbdcd7fb22de38a5d2e2bf Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 30 Oct 2024 10:13:58 -0600 Subject: [PATCH] Updates based on PR feedback - Move the `uv_async_send` under our lock to avoid crash/race - Replace `uv_poll_stop` with `uv_close` and deal with nle->handle in place and not again in the final close callback. Signed-off-by: Ivan Kozlovic --- src/adapters/libuv.h | 75 +++++++++++++++++++------------------------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/src/adapters/libuv.h b/src/adapters/libuv.h index e5d29473..5f212e4b 100644 --- a/src/adapters/libuv.h +++ b/src/adapters/libuv.h @@ -130,9 +130,14 @@ uvScheduleToEventLoop(natsLibuvEvents *nle, int eventType, bool add) nle->tail = newEvent; - uv_mutex_unlock(nle->lock); - + // We need to wake up the event loop thread under our lock because + // due to signal coalescing (and the reason we have a list), it is + // possible that the detach that we have just added is processed + // after we release the lock, freeing the `nle` structure. Calling + // `uv_async_send(nle->scheduler)` outside this lock would then + // cause a crash or race. res = uv_async_send(nle->scheduler); + uv_mutex_unlock(nle->lock); return (res == 0 ? NATS_OK : NATS_ERR); } @@ -158,11 +163,15 @@ natsLibuvPoll(uv_poll_t* handle, int status, int events) natsConnection_ProcessWriteEvent(nle->nc); } +static void +uvHandleClosedCb(uv_handle_t *handle) +{ + free(handle); +} + static natsStatus uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add) { - int res; - if (eventType == NATS_LIBUV_READ) { if (add) @@ -179,43 +188,30 @@ uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add) } if (nle->events) - res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll); - else { - res = uv_poll_stop(nle->handle); - if (res == 0) - { - // We have stopped polling for events for this socket and are in - // the event loop thread, so we invoke this so that the NATS C - // client library can proceed with closing the socket. - natsConnection_ProcessCloseEvent(&(nle->socket)); - } + int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll); + return (res == 0 ? NATS_OK : NATS_ERR); } - - if (res != 0) - return NATS_ERR; + // Both read and write events have been removed, this signal that the socket + // should be closed prior to a reconnect or during natsConnection_Close(). + uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb); + nle->handle = NULL; + // We have stopped polling for events for this socket and are in the event + // loop thread, so we invoke this so that the NATS C client library can + // proceed with closing the socket. + natsConnection_ProcessCloseEvent(&(nle->socket)); return NATS_OK; } -static void -uvHandleClosedCb(uv_handle_t *handle) -{ - free(handle); -} - static natsStatus uvAsyncAttach(natsLibuvEvents *nle) { natsStatus s = NATS_OK; - // We are reconnecting, destroy the old handle, create a new one - if (nle->handle != NULL) - { - uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb); - nle->handle = NULL; - } - + // Even when this is a reconnect, previous nle->handle has already been + // set to NULL (and the memory has or will be freed in uvHandleClosedCb), + // so recreate now. nle->handle = (uv_poll_t*) malloc(sizeof(uv_poll_t)); if (nle->handle == NULL) s = NATS_NO_MEMORY; @@ -241,9 +237,9 @@ uvAsyncAttach(natsLibuvEvents *nle) } static void -finalCloseCb(uv_handle_t* handle) +uvFinalCloseCb(uv_handle_t* handle) { - natsLibuvEvents *nle = (natsLibuvEvents*)handle->data; + natsLibuvEvents *nle = (natsLibuvEvents*) handle->data; natsLibuvEvent *event; while ((event = nle->head) != NULL) @@ -251,25 +247,16 @@ finalCloseCb(uv_handle_t* handle) nle->head = event->next; free(event); } - free(nle->handle); free(nle->scheduler); uv_mutex_destroy(nle->lock); free(nle->lock); free(nle); } -static void -closeSchedulerCb(uv_handle_t* scheduler) -{ - natsLibuvEvents *nle = (natsLibuvEvents*) scheduler->data; - - uv_close((uv_handle_t*) nle->handle, finalCloseCb); -} - static void uvAsyncDetach(natsLibuvEvents *nle) { - uv_close((uv_handle_t*) nle->scheduler, closeSchedulerCb); + uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb); } static void @@ -317,6 +304,10 @@ uvAsyncCb(uv_async_t *handle) case NATS_LIBUV_DETACH: { uvAsyncDetach(nle); + // We want to make sure that we will exit this loop since by now + // the `nle` structure may have been freed. Regardless, this is + // supposed to be the last event for this `nle` object. + more = false; break; } default: