Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Optimization - Refactor TWCC (#1934) #2075

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP
// Set the ICE mode explicitly
configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL;

configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE;
stefankiesz marked this conversation as resolved.
Show resolved Hide resolved
// Set the STUN server
PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
Expand Down
131 changes: 97 additions & 34 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE);
pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager));
CHK(pKvsPeerConnection->pTwccManager != NULL, STATUS_NOT_ENOUGH_MEMORY);
CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH,
&pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
}

*ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection;
Expand Down Expand Up @@ -1041,10 +1043,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKvsPeerConnection pKvsPeerConnection;
PKvsPeerConnection pKvsPeerConnection = NULL;
PDoubleListNode pCurNode = NULL;
UINT64 item = 0;
UINT64 startTime;
UINT32 twccHashTableCount = 0;
BOOL twccLocked = FALSE;

CHK(ppPeerConnection != NULL, STATUS_NULL_ARG);

Expand Down Expand Up @@ -1113,12 +1117,21 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
}

if (pKvsPeerConnection->pTwccManager != NULL) {
MUTEX_LOCK(pKvsPeerConnection->twccLock);
twccLocked = TRUE;
if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) {
DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount);
}
CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry));
CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE;
}
// twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets)
// we should not deallocate items but we do need to clear the queue
CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE));
SAFE_MEMFREE(pKvsPeerConnection->pTwccManager);
}

Expand All @@ -1127,8 +1140,17 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)

PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection");
SAFE_MEMFREE(*ppPeerConnection);
ppPeerConnection = NULL;
CleanUp:

if (ppPeerConnection != NULL) {
stefankiesz marked this conversation as resolved.
Show resolved Hide resolved
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
}
}
LEAVES();
return retStatus;
}
Expand Down Expand Up @@ -1826,47 +1848,88 @@ STATUS deinitKvsWebRtc(VOID)
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket)
// Not thread safe. Ensure this function is invoked in a guarded section
static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 seqNum)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
UINT16 updatedSeqNum = 0;
PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL;
UINT64 ageOfOldest = 0, firstRtpTime = 0;
UINT64 twccPacketValue = 0;
BOOL isCheckComplete = FALSE;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG);

updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow;
do {
// If the seqNum is not present in the hash table, it is ok. We move on to the next
if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) {
tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue;
}
if (tempTwccRtpPktInfo != NULL) {
firstRtpTime = tempTwccRtpPktInfo->localTimeKvs;
// Would be the case if the timestamps are not monotonically increasing.
if (pRtpPacket->sentTime >= firstRtpTime) {
ageOfOldest = pRtpPacket->sentTime - firstRtpTime;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
// If the seqNum is not present in the hash table, move on. However, this case should not happen
// given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists
if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) {
SAFE_MEMFREE(tempTwccRtpPktInfo);
}
updatedSeqNum++;
} else {
isCheckComplete = TRUE;
}
} else {
// Move to the next seqNum to check if we can remove the next one atleast
DLOGV("Detected timestamp not increasing monotonically for RTP packet %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64,
updatedSeqNum, firstRtpTime, pRtpPacket->sentTime);
updatedSeqNum++;
}
} else {
updatedSeqNum++;
}
// reset before next iteration
tempTwccRtpPktInfo = NULL;
} while (!isCheckComplete && updatedSeqNum != (seqNum + 1));

// Update regardless. The loop checks until current RTP packets seq number irrespective of the failure
pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum;
CleanUp:
LEAVES();
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
UINT64 sn = 0;
UINT16 seqNum;
BOOL isEmpty = FALSE;
INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest;
CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS);
UINT16 seqNum = 0;
PTwccRtpPacketInfo pTwccRtpPktInfo = NULL;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS);
CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS);

MUTEX_LOCK(pc->twccLock);
MUTEX_LOCK(pKvsPeerConnection->twccLock);
locked = TRUE;

CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY);

pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength;
pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime;
pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME;
seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload);
CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum));
pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum;
pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength;
pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime;
pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME;
pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime;

// cleanup queue until it contains up to 2 seconds of sent packets
do {
CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn));
firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs;
lastLocalTimeKvs = pRtpPacket->sentTime;
ageOfOldest = lastLocalTimeKvs - firstTimeKvs;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn));
CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty));
} else {
break;
}
} while (!isEmpty);
CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo));

// Ensure twccRollingWindowDeletion is run in a guarded section
CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum));
CleanUp:
if (locked) {
MUTEX_UNLOCK(pc->twccLock);
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
}
CHK_LOG_ERR(retStatus);

Expand Down
18 changes: 10 additions & 8 deletions src/source/PeerConnection/PeerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extern "C" {
#define CODEC_HASH_TABLE_BUCKET_LENGTH 2
#define RTX_HASH_TABLE_BUCKET_COUNT 50
#define RTX_HASH_TABLE_BUCKET_LENGTH 2
#define TWCC_HASH_TABLE_BUCKET_COUNT 100
#define TWCC_HASH_TABLE_BUCKET_LENGTH 2

#define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200
#define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2
Expand All @@ -47,17 +49,16 @@ typedef enum {
} RTX_CODEC;

typedef struct {
UINT16 seqNum;
UINT16 packetSize;
UINT64 localTimeKvs;
UINT64 remoteTimeKvs;
} TwccPacket, *PTwccPacket;
UINT32 packetSize;
} TwccRtpPacketInfo, *PTwccRtpPacketInfo;

typedef struct {
StackQueue twccPackets;
TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality
UINT64 lastLocalTimeKvs;
UINT16 lastReportedSeqNum;
PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket]
UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window
UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response
UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response
} TwccManager, *PTwccManager;

typedef struct {
Expand All @@ -70,7 +71,7 @@ typedef struct {

typedef struct {
RtcPeerConnection peerConnection;
// UINT32 padding padding makes transportWideSequenceNumber 64bit aligned
// UINT32 padding makes transportWideSequenceNumber 64bit aligned
// we put atomics at the top of structs because customers application could set the packing to 0
// in which case any atomic operations would result in bus errors if there is a misalignment
// for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907
Expand Down Expand Up @@ -183,6 +184,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32);
STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32);
STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE);
STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket);
UINT32 parseExtId(PCHAR);

// visible for testing only
VOID onIceConnectionStateChange(UINT64, UINT64);
Expand Down
Loading
Loading