Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[0.x] Optimize NACKs handling #3471

Draft
wants to merge 3 commits into
base: 0.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fuzzers/rtcp_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
janus_rtcp_remove_nacks((char *)copy_data[idx++], size);
/* Functions that allocate new memory */
char *output_data = janus_rtcp_filter((char *)data, size, &newlen);
GSList *list = janus_rtcp_get_nacks((char *)data, size);
GQueue *queue = g_queue_new();
janus_rtcp_get_nacks((char *)data, size, queue);

/* Free resources */
g_free(output_data);
if (list) g_slist_free(list);
g_queue_free(queue);
return 0;
}
23 changes: 14 additions & 9 deletions ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,9 @@ static void janus_ice_component_free(const janus_refcount *component_ref) {
g_queue_free(component->video_retransmit_buffer);
g_hash_table_destroy(component->video_retransmit_seqs);
}
if(component->nacks_queue != NULL) {
g_queue_free(component->nacks_queue);
}
if(component->candidates != NULL) {
GSList *i = NULL, *candidates = component->candidates;
for(i = candidates; i; i = i->next) {
Expand Down Expand Up @@ -3310,17 +3313,18 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp

/* Now let's see if there are any NACKs to handle */
gint64 now = janus_get_monotonic_time();
GSList *nacks = janus_rtcp_get_nacks(buf, buflen);
guint nacks_count = g_slist_length(nacks);
GQueue *nacks = component->nacks_queue;
janus_rtcp_get_nacks(buf, buflen, nacks);
guint nacks_count = g_queue_get_length(nacks);
if(nacks_count && ((!video && component->do_audio_nacks) || (video && component->do_video_nacks))) {
/* Handle NACK */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Just got some NACKS (%d) we should handle...\n", handle->handle_id, nacks_count);
GHashTable *retransmit_seqs = (video ? component->video_retransmit_seqs : component->audio_retransmit_seqs);
GSList *list = (retransmit_seqs != NULL ? nacks : NULL);
GQueue *queue = (retransmit_seqs != NULL ? nacks : NULL);
int retransmits_cnt = 0;
janus_mutex_lock(&component->mutex);
while(list) {
unsigned int seqnr = GPOINTER_TO_UINT(list->data);
while(g_queue_get_length(queue) > 0) {
unsigned int seqnr = GPOINTER_TO_UINT(g_queue_pop_tail(queue));
JANUS_LOG(LOG_DBG, "[%"SCNu64"] >> %u\n", handle->handle_id, seqnr);
int in_rb = 0;
/* Check if we have the packet */
Expand All @@ -3331,7 +3335,7 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
/* Should we retransmit this packet? */
if((p->last_retransmit > 0) && (now-p->last_retransmit < MAX_NACK_IGNORE)) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Packet %u was retransmitted just %"SCNi64"ms ago, skipping\n", handle->handle_id, seqnr, now-p->last_retransmit);
list = list->next;
g_queue_pop_tail(queue);
continue;
}
in_rb = 1;
Expand Down Expand Up @@ -3377,7 +3381,7 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
if(rtcp_ctx != NULL && in_rb) {
g_atomic_int_inc(&rtcp_ctx->nack_count);
}
list = list->next;
g_queue_pop_tail(queue);
}
component->retransmit_recent_cnt += retransmits_cnt;
/* FIXME Remove the NACK compound packet, we've handled it */
Expand All @@ -3389,8 +3393,6 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
component->in_stats.audio.nacks += nacks_count;
}
janus_mutex_unlock(&component->mutex);
g_slist_free(nacks);
nacks = NULL;
}
if(component->retransmit_recent_cnt &&
now - component->retransmit_log_ts > 5*G_USEC_PER_SEC) {
Expand Down Expand Up @@ -5273,6 +5275,9 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
/* Insert in the table too, for quick lookup */
g_hash_table_insert(component->video_retransmit_seqs, GUINT_TO_POINTER(seq), p);
}
if(component->nacks_queue == NULL) {
component->nacks_queue = g_queue_new();
}
} else {
janus_ice_free_rtp_packet(p);
}
Expand Down
2 changes: 2 additions & 0 deletions ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ struct janus_ice_component {
GQueue *audio_retransmit_buffer, *video_retransmit_buffer;
/*! \brief HashTable of retransmittable sequence numbers, in case we receive NACKs */
GHashTable *audio_retransmit_seqs, *video_retransmit_seqs;
/*! \brief Helper queues for storing requested packets from NACKs */
GQueue *nacks_queue;
/*! \brief Current sequence number for the RFC4588 rtx SSRC session */
guint16 rtx_seq_number;
/*! \brief Last time a log message about sending retransmits was printed */
Expand Down
20 changes: 8 additions & 12 deletions rtcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1195,12 +1195,11 @@ gboolean janus_rtcp_has_pli(char *packet, int len) {
return FALSE;
}

GSList *janus_rtcp_get_nacks(char *packet, int len) {
if(packet == NULL || len == 0)
return NULL;
void janus_rtcp_get_nacks(char *packet, int len, GQueue *nacks_queue) {
if(packet == NULL || len == 0 || nacks_queue == NULL)
return;
janus_rtcp_header *rtcp = (janus_rtcp_header *)packet;
/* FIXME Get list of sequence numbers we should send again */
GSList *list = NULL;
g_queue_clear(nacks_queue);
int total = len;
gboolean error = FALSE;
while(rtcp) {
Expand Down Expand Up @@ -1232,13 +1231,13 @@ GSList *janus_rtcp_get_nacks(char *packet, int len) {
for(i=0; i< nacks; i++) {
nack = (janus_rtcp_nack *)rtcpfb->fci + i;
pid = ntohs(nack->pid);
list = g_slist_prepend(list, GUINT_TO_POINTER(pid));
g_queue_push_head(nacks_queue, GUINT_TO_POINTER(pid));
blp = ntohs(nack->blp);
memset(bitmask, 0, 20);
for(j=0; j<16; j++) {
bitmask[j] = (blp & ( 1 << j )) >> j ? '1' : '0';
if((blp & ( 1 << j )) >> j)
list = g_slist_prepend(list, GUINT_TO_POINTER(pid+j+1));
g_queue_push_head(nacks_queue, GUINT_TO_POINTER(pid+j+1));
}
bitmask[16] = '\n';
JANUS_LOG(LOG_DBG, "[%d] %"SCNu16" / %s\n", i, pid, bitmask);
Expand All @@ -1256,12 +1255,9 @@ GSList *janus_rtcp_get_nacks(char *packet, int len) {
break;
rtcp = (janus_rtcp_header *)((uint32_t*)rtcp + length + 1);
}
if (error && list) {
g_slist_free(list);
list = NULL;
if (error) {
g_queue_clear(nacks_queue);
}
list = g_slist_reverse(list);
return list;
}

int janus_rtcp_remove_nacks(char *packet, int len) {
Expand Down
7 changes: 3 additions & 4 deletions rtcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ uint32_t janus_rtcp_context_get_out_media_link_quality(janus_rtcp_context *ctx);
/*! \brief Method to swap Report Blocks and move media RB in first position in case rtx SSRC comes first
* @param[in] packet The message data
* @param[in] len The message data length in bytes
* @param[in] rtx_ssrc The rtx SSRC
* @returns The receiver SSRC, or 0 in case of error */
* @param[in] rtx_ssrc The rtx SSRC */
void janus_rtcp_swap_report_blocks(char *packet, int len, uint32_t rtx_ssrc);
/*! \brief Method to quickly retrieve the sender SSRC (needed for demuxing RTCP in BUNDLE)
* @param[in] packet The message data
Expand Down Expand Up @@ -451,8 +450,8 @@ gboolean janus_rtcp_has_pli(char *packet, int len);
/*! \brief Method to parse an RTCP NACK message
* @param[in] packet The message data
* @param[in] len The message data length in bytes
* @returns A list of janus_nack elements containing the sequence numbers to send again */
GSList *janus_rtcp_get_nacks(char *packet, int len);
* @param[in,out] nacks_queue The queue containing the sequence numbers to send again */
void janus_rtcp_get_nacks(char *packet, int len, GQueue *nacks_queue);

/*! \brief Method to remove an RTCP NACK message
* @param[in] packet The message data
Expand Down