Skip to content

Commit

Permalink
Updates based on PR feedback
Browse files Browse the repository at this point in the history
- Move the `uv_async_send` under our lock to avoid crash/race
- Replace `uv_poll_stop` with `uv_close` and deal with nle->handle
in place and not again in the final close callback.

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Oct 30, 2024
1 parent 86df606 commit 53145b5
Showing 1 changed file with 33 additions and 42 deletions.
75 changes: 33 additions & 42 deletions src/adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,14 @@ uvScheduleToEventLoop(natsLibuvEvents *nle, int eventType, bool add)

nle->tail = newEvent;

uv_mutex_unlock(nle->lock);

// We need to wake up the event loop thread under our lock because
// due to signal coalescing (and the reason we have a list), it is
// possible that the detach that we have just added is processed
// after we release the lock, freeing the `nle` structure. Calling
// `uv_async_send(nle->scheduler)` outside this lock would then
// cause a crash or race.
res = uv_async_send(nle->scheduler);
uv_mutex_unlock(nle->lock);

return (res == 0 ? NATS_OK : NATS_ERR);
}
Expand All @@ -158,11 +163,15 @@ natsLibuvPoll(uv_poll_t* handle, int status, int events)
natsConnection_ProcessWriteEvent(nle->nc);
}

static void
uvHandleClosedCb(uv_handle_t *handle)
{
free(handle);
}

static natsStatus
uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
{
int res;

if (eventType == NATS_LIBUV_READ)
{
if (add)
Expand All @@ -179,43 +188,30 @@ uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
}

if (nle->events)
res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
else
{
res = uv_poll_stop(nle->handle);
if (res == 0)
{
// We have stopped polling for events for this socket and are in
// the event loop thread, so we invoke this so that the NATS C
// client library can proceed with closing the socket.
natsConnection_ProcessCloseEvent(&(nle->socket));
}
int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
return (res == 0 ? NATS_OK : NATS_ERR);
}

if (res != 0)
return NATS_ERR;
// Both read and write events have been removed, this signal that the socket
// should be closed prior to a reconnect or during natsConnection_Close().
uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
nle->handle = NULL;
// We have stopped polling for events for this socket and are in the event
// loop thread, so we invoke this so that the NATS C client library can
// proceed with closing the socket.
natsConnection_ProcessCloseEvent(&(nle->socket));

return NATS_OK;
}

static void
uvHandleClosedCb(uv_handle_t *handle)
{
free(handle);
}

static natsStatus
uvAsyncAttach(natsLibuvEvents *nle)
{
natsStatus s = NATS_OK;

// We are reconnecting, destroy the old handle, create a new one
if (nle->handle != NULL)
{
uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
nle->handle = NULL;
}

// Even when this is a reconnect, previous nle->handle has already been
// set to NULL (and the memory has or will be freed in uvHandleClosedCb),
// so recreate now.
nle->handle = (uv_poll_t*) malloc(sizeof(uv_poll_t));
if (nle->handle == NULL)
s = NATS_NO_MEMORY;
Expand All @@ -241,35 +237,26 @@ uvAsyncAttach(natsLibuvEvents *nle)
}

static void
finalCloseCb(uv_handle_t* handle)
uvFinalCloseCb(uv_handle_t* handle)
{
natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
natsLibuvEvent *event;

while ((event = nle->head) != NULL)
{
nle->head = event->next;
free(event);
}
free(nle->handle);
free(nle->scheduler);
uv_mutex_destroy(nle->lock);
free(nle->lock);
free(nle);
}

static void
closeSchedulerCb(uv_handle_t* scheduler)
{
natsLibuvEvents *nle = (natsLibuvEvents*) scheduler->data;

uv_close((uv_handle_t*) nle->handle, finalCloseCb);
}

static void
uvAsyncDetach(natsLibuvEvents *nle)
{
uv_close((uv_handle_t*) nle->scheduler, closeSchedulerCb);
uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb);
}

static void
Expand Down Expand Up @@ -317,6 +304,10 @@ uvAsyncCb(uv_async_t *handle)
case NATS_LIBUV_DETACH:
{
uvAsyncDetach(nle);
// We want to make sure that we will exit this loop since by now
// the `nle` structure may have been freed. Regardless, this is
// supposed to be the last event for this `nle` object.
more = false;
break;
}
default:
Expand Down

0 comments on commit 53145b5

Please sign in to comment.