Skip to content

Commit

Permalink
Refactor TWCC (#1934)
Browse files Browse the repository at this point in the history
* Remove stackqueue usage

* Use hashtable instead..working logic

* Cleanup, increase hash table size, fix loop bug

* method 2, array of pointers

* Add rolling window

* rolling window with hashtable

* hash table with rw

* Fix bug

* Fix twcc unit test

* Cleanup rw logic

* Cleanup

* Cleanup logic

* Update README

* unused var fix

* Use defines for hash table size

* Address comments, disable TWCC by default

* readme

* Fix windows gst issue

* Comments
  • Loading branch information
disa6302 authored and stefankiesz committed Nov 5, 2024
1 parent 0d45715 commit afcf43e
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 90 deletions.
2 changes: 2 additions & 0 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP
// Set the ICE mode explicitly
configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL;

configuration.kvsRtcConfiguration.enableIceStats = pSampleConfiguration->enableIceStats;
configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE;
// Set the STUN server
PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
Expand Down
131 changes: 97 additions & 34 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE);
pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager));
CHK(pKvsPeerConnection->pTwccManager != NULL, STATUS_NOT_ENOUGH_MEMORY);
CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH,
&pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
}

*ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection;
Expand Down Expand Up @@ -1041,10 +1043,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKvsPeerConnection pKvsPeerConnection;
PKvsPeerConnection pKvsPeerConnection = NULL;
PDoubleListNode pCurNode = NULL;
UINT64 item = 0;
UINT64 startTime;
UINT32 twccHashTableCount = 0;
BOOL twccLocked = FALSE;

CHK(ppPeerConnection != NULL, STATUS_NULL_ARG);

Expand Down Expand Up @@ -1113,12 +1117,21 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
}

if (pKvsPeerConnection->pTwccManager != NULL) {
MUTEX_LOCK(pKvsPeerConnection->twccLock);
twccLocked = TRUE;
if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) {
DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount);
}
CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry));
CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE;
}
// twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets)
// we should not deallocate items but we do need to clear the queue
CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE));
SAFE_MEMFREE(pKvsPeerConnection->pTwccManager);
}

Expand All @@ -1127,8 +1140,17 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)

PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection");
SAFE_MEMFREE(*ppPeerConnection);
ppPeerConnection = NULL;
CleanUp:

if (ppPeerConnection != NULL) {
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
}
}
LEAVES();
return retStatus;
}
Expand Down Expand Up @@ -1826,47 +1848,88 @@ STATUS deinitKvsWebRtc(VOID)
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket)
// Not thread safe. Ensure this function is invoked in a guarded section
static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 seqNum)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
UINT16 updatedSeqNum = 0;
PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL;
UINT64 ageOfOldest = 0, firstRtpTime = 0;
UINT64 twccPacketValue = 0;
BOOL isCheckComplete = FALSE;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG);

updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow;
do {
// If the seqNum is not present in the hash table, it is ok. We move on to the next
if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) {
tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue;
}
if (tempTwccRtpPktInfo != NULL) {
firstRtpTime = tempTwccRtpPktInfo->localTimeKvs;
// Would be the case if the timestamps are not monotonically increasing.
if (pRtpPacket->sentTime >= firstRtpTime) {
ageOfOldest = pRtpPacket->sentTime - firstRtpTime;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
// If the seqNum is not present in the hash table, move on. However, this case should not happen
// given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists
if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) {
SAFE_MEMFREE(tempTwccRtpPktInfo);
}
updatedSeqNum++;
} else {
isCheckComplete = TRUE;
}
} else {
// Move to the next seqNum to check if we can remove the next one atleast
DLOGV("Detected timestamp not increasing monotonically for RTP packet %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64,
updatedSeqNum, firstRtpTime, pRtpPacket->sentTime);
updatedSeqNum++;
}
} else {
updatedSeqNum++;
}
// reset before next iteration
tempTwccRtpPktInfo = NULL;
} while (!isCheckComplete && updatedSeqNum != (seqNum + 1));

// Update regardless. The loop checks until current RTP packets seq number irrespective of the failure
pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum;
CleanUp:
LEAVES();
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
UINT64 sn = 0;
UINT16 seqNum;
BOOL isEmpty = FALSE;
INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest;
CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS);
UINT16 seqNum = 0;
PTwccRtpPacketInfo pTwccRtpPktInfo = NULL;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS);
CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS);

MUTEX_LOCK(pc->twccLock);
MUTEX_LOCK(pKvsPeerConnection->twccLock);
locked = TRUE;

CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY);

pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength;
pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime;
pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME;
seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload);
CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum));
pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum;
pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength;
pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime;
pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME;
pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime;

// cleanup queue until it contains up to 2 seconds of sent packets
do {
CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn));
firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs;
lastLocalTimeKvs = pRtpPacket->sentTime;
ageOfOldest = lastLocalTimeKvs - firstTimeKvs;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn));
CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty));
} else {
break;
}
} while (!isEmpty);
CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo));

// Ensure twccRollingWindowDeletion is run in a guarded section
CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum));
CleanUp:
if (locked) {
MUTEX_UNLOCK(pc->twccLock);
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
}
CHK_LOG_ERR(retStatus);

Expand Down
18 changes: 10 additions & 8 deletions src/source/PeerConnection/PeerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extern "C" {
#define CODEC_HASH_TABLE_BUCKET_LENGTH 2
#define RTX_HASH_TABLE_BUCKET_COUNT 50
#define RTX_HASH_TABLE_BUCKET_LENGTH 2
#define TWCC_HASH_TABLE_BUCKET_COUNT 100
#define TWCC_HASH_TABLE_BUCKET_LENGTH 2

#define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200
#define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2
Expand All @@ -47,17 +49,16 @@ typedef enum {
} RTX_CODEC;

typedef struct {
UINT16 seqNum;
UINT16 packetSize;
UINT64 localTimeKvs;
UINT64 remoteTimeKvs;
} TwccPacket, *PTwccPacket;
UINT32 packetSize;
} TwccRtpPacketInfo, *PTwccRtpPacketInfo;

typedef struct {
StackQueue twccPackets;
TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality
UINT64 lastLocalTimeKvs;
UINT16 lastReportedSeqNum;
PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket]
UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window
UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response
UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response
} TwccManager, *PTwccManager;

typedef struct {
Expand All @@ -70,7 +71,7 @@ typedef struct {

typedef struct {
RtcPeerConnection peerConnection;
// UINT32 padding padding makes transportWideSequenceNumber 64bit aligned
// UINT32 padding makes transportWideSequenceNumber 64bit aligned
// we put atomics at the top of structs because customers application could set the packing to 0
// in which case any atomic operations would result in bus errors if there is a misalignment
// for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907
Expand Down Expand Up @@ -183,6 +184,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32);
STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32);
STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE);
STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket);
UINT32 parseExtId(PCHAR);

// visible for testing only
VOID onIceConnectionStateChange(UINT64, UINT64);
Expand Down
Loading

0 comments on commit afcf43e

Please sign in to comment.