Skip to content

Commit

Permalink
[FIXED] EventLoop: Socket now closed only after event loop done polling
Browse files Browse the repository at this point in the history
The socket was closed by the NATS library itself, which could cause
some issue when the event loop thread would still be polling it.
We now defer to the event loop adapter to make sure that the event
loop library is done polling before invoking a new function that
will take care of closing the socket.

I have updated the event loop test (that simulates what our adapters are
doing). The mockup event loop implementation is a bit too simplistic
but should be ok for now. If we have issues, we would have to make
the events a linked list.

Resolves #814

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Oct 23, 2024
1 parent 7b5d3d7 commit 86df606
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 33 deletions.
20 changes: 20 additions & 0 deletions src/adapters/libevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock so
return s;
}

static void
_closeCb(evutil_socket_t fd, short event, void *arg)
{
natsSock socket = (natsSock) fd;

// We have stopped polling for the "READ" event and are now in the
// event loop thread and invoke this so that the NATS C client
// library can proceed with the close of the socket/connection.
natsConnection_ProcessCloseEvent(&socket);
}

/** \brief Start or stop polling on READ events.
*
* This callback is invoked to notify that the event library should start
Expand All @@ -175,7 +186,16 @@ natsLibevent_Read(void *userData, bool add)
if (add)
res = event_add(nle->read, NULL);
else
{
int socket = event_get_fd(nle->read);
res = event_del_noblock(nle->read);
if (res == 0)
{
// This will schedule a one-time event that guarantees that the
// callback `_closeCb` will be invoked from the event loop thread.
res = event_base_once(nle->loop, socket, EV_TIMEOUT, _closeCb, (void*) nle, NULL);
}
}

return (res == 0 ? NATS_OK : NATS_ERR);
}
Expand Down
9 changes: 9 additions & 0 deletions src/adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,16 @@ uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
if (nle->events)
res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
else
{
res = uv_poll_stop(nle->handle);
if (res == 0)
{
// We have stopped polling for events for this socket and are in
// the event loop thread, so we invoke this so that the NATS C
// client library can proceed with closing the socket.
natsConnection_ProcessCloseEvent(&(nle->socket));
}
}

if (res != 0)
return NATS_ERR;
Expand Down
60 changes: 45 additions & 15 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2146,9 +2146,21 @@ _evStopPolling(natsConnection *nc)

nc->sockCtx.useEventLoop = false;
nc->el.writeAdded = false;
s = nc->opts->evCbs.read(nc->el.data, NATS_EVENT_ACTION_REMOVE);
// The "write" event is added and removed as we write, however, we always
// have the "read" event added to the event loop. Removing it signals that
// the connection is closed and so the event loop adapter can then invoke
// natsConnection_ProcessCloseEvent() when the event loop is done polling
// the event. So we will remove "write" first, then finish with "read".
s = nc->opts->evCbs.write(nc->el.data, NATS_EVENT_ACTION_REMOVE);
if (s == NATS_OK)
s = nc->opts->evCbs.write(nc->el.data, NATS_EVENT_ACTION_REMOVE);
s = nc->opts->evCbs.read(nc->el.data, NATS_EVENT_ACTION_REMOVE);
if (s == NATS_OK)
{
// We can't close the socket here, but we will mark as invalid and
// clear SSL object if applicable.
nc->sockCtx.fd = NATS_SOCK_INVALID;
_clearSSL(nc);
}

return s;
}
Expand Down Expand Up @@ -2187,6 +2199,7 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
SET_WRITE_DEADLINE(nc);
natsConn_bufferFlush(nc);

// Shutdown the socket to stop any read/write operations.
natsSock_Shutdown(nc->sockCtx.fd);
nc->sockCtx.fdActive = false;
}
Expand All @@ -2195,12 +2208,10 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
// on the socket since we are going to reconnect.
if (nc->el.attached)
{
// This will take care of invalidating the socket and clear SSL,
// but the actual socket close will be done from the event loop
// adapter by calling natsConnection_ProcessCloseEvent().
ls = _evStopPolling(nc);
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
}

// Fail pending flush requests.
Expand Down Expand Up @@ -2579,13 +2590,20 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
{
// If event loop attached, stop polling...
if (nc->el.attached)
{
// This will take care of invalidating the socket and clear SSL,
// but the actual socket close will be done from the event loop
// adapter by calling natsConnection_ProcessCloseEvent().
_evStopPolling(nc);
}
else
{
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
}
}
else
{
Expand Down Expand Up @@ -3411,6 +3429,7 @@ natsConnection_Reconnect(natsConnection *nc)
natsSock_Shutdown(nc->sockCtx.fd);

natsConn_Unlock(nc);

return NATS_OK;
}

Expand Down Expand Up @@ -4098,13 +4117,16 @@ natsConnection_ProcessReadEvent(natsConnection *nc)
buffer = nc->el.buffer;
size = nc->opts->ioBufSize;

natsConn_Unlock(nc);

// Do not try to read again here on success. If more than one connection
// is attached to the same loop, and there is a constant stream of data
// coming for the first connection, this would starve the second connection.
// So return and we will be called back later by the event loop.

// This needs to be protected by the connection lock. We are here because
// there is a read event, so we will gather some data in natsSock_Read()
// but not wait there.
s = natsSock_Read(&(nc->sockCtx), buffer, size, &n);
natsConn_Unlock(nc);
if (s == NATS_OK)
s = natsParser_Parse(nc, buffer, n);

Expand Down Expand Up @@ -4159,8 +4181,16 @@ natsConnection_ProcessWriteEvent(natsConnection *nc)

if (s != NATS_OK)
_processOpError(nc, s, false);
}

void
natsConnection_ProcessCloseEvent(natsSock *socket)
{
if ((socket == NULL) || (*socket == NATS_SOCK_INVALID))
return;

(void) NATS_UPDATE_ERR_STACK(s);
natsSock_Close(*socket);
*socket = NATS_SOCK_INVALID;
}

natsStatus
Expand Down
14 changes: 14 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4194,6 +4194,20 @@ natsConnection_Reconnect(natsConnection *nc);
NATS_EXTERN void
natsConnection_ProcessReadEvent(natsConnection *nc);

/** \brief Process a socket close event when using external event loop.
*
* When using an external event loop, and the library wants to close
* the connection, the event loop adapter will ensure that the event
* loop library stops polling, and then will invoke this function
* so that the socket can be safely closed.
*
* @param socket the pointer to the #natsSock object.
*
* \warning This API is reserved for external event loop adapters.
*/
NATS_EXTERN void
natsConnection_ProcessCloseEvent(natsSock *socket);

/** \brief Process a write event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
Loading

0 comments on commit 86df606

Please sign in to comment.