Skip to content

Commit

Permalink
MT#55283 honour edge-triggered epoll
Browse files Browse the repository at this point in the history
We need to loop over the read(fd) as we're calling epoll using
edge-triggered semantics.

Reported in #1676

Change-Id: I8f36b76c1ab32cf5c97b3dff1acf4e3e081ea33e
  • Loading branch information
rfuchs committed Jun 13, 2023
1 parent 5d75460 commit 670f117
Showing 1 changed file with 33 additions and 36 deletions.
69 changes: 33 additions & 36 deletions recording-daemon/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,45 +47,42 @@ static void stream_handler(handler_t *handler) {

//dbg("poll event for %s", stream->name);

pthread_mutex_lock(&stream->lock);

if (stream->fd == -1)
goto out;

buf = malloc(ALLOCLEN);
int ret = read(stream->fd, buf, MAXBUFLEN);
if (ret == 0) {
ilog(LOG_INFO, "EOF on stream %s", stream->name);
stream_close(stream);
goto out;
}
else if (ret < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
goto out;
ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno));
stream_close(stream);
goto out;
}

// got a packet
pthread_mutex_unlock(&stream->lock);

if (forward_to){
if (forward_packet(stream->metafile,buf,ret)) // leaves buf intact
g_atomic_int_inc(&stream->metafile->forward_failed);
while (true) {
pthread_mutex_lock(&stream->lock);

if (stream->fd == -1)
break;

buf = malloc(ALLOCLEN);
int ret = read(stream->fd, buf, MAXBUFLEN);
if (ret == 0) {
ilog(LOG_INFO, "EOF on stream %s", stream->name);
stream_close(stream);
break;
}
else if (ret < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
break;
ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno));
stream_close(stream);
break;
}

// got a packet
pthread_mutex_unlock(&stream->lock);

if (forward_to){
if (forward_packet(stream->metafile,buf,ret)) // leaves buf intact
g_atomic_int_inc(&stream->metafile->forward_failed);
else
g_atomic_int_inc(&stream->metafile->forward_count);
}
if (decoding_enabled)
packet_process(stream, buf, ret); // consumes buf
else
g_atomic_int_inc(&stream->metafile->forward_count);
free(buf);
}
if (decoding_enabled)
packet_process(stream, buf, ret); // consumes buf
else
free(buf);

log_info_call = NULL;
log_info_stream = NULL;
return;

out:
pthread_mutex_unlock(&stream->lock);
free(buf);
log_info_call = NULL;
Expand Down

0 comments on commit 670f117

Please sign in to comment.