diff --git a/docs/API/API-socket-options.md b/docs/API/API-socket-options.md index 19d3019f9..b503b2aea 100644 --- a/docs/API/API-socket-options.md +++ b/docs/API/API-socket-options.md @@ -58,7 +58,7 @@ Exchange for the initial key is done in the handshake. - `SRT_KM_S_SECURED` (`2`): KM exchange was successful and the data will be sent encrypted and will be decrypted by the receiver. This state is only possible on -both sides in both directions simultaneously. +both sides in both directions simultaneously. Any unencrypted packet will be dropped by the receiver. - `SRT_KM_S_NOSECRET` (`3`): If this state is in the sending direction (`SRTO_SNDKMSTATE`), then it means that the sending party has set a passphrase, but the peer did not. diff --git a/docs/API/statistics.md b/docs/API/statistics.md index a34478f64..d2038fb03 100644 --- a/docs/API/statistics.md +++ b/docs/API/statistics.md @@ -245,6 +245,8 @@ Packets may be dropped conditionally when both `SRTO_TSBPDMODE` and `SRTO_TLPKTD #### pktRcvUndecryptTotal The total number of packets that failed to be decrypted at the receiver side. Available for receiver. +The statistic also counts unencrypted packets that were expected to be uncrypted on a secured connection (see [SRTO_KM_S_SECURED](API-socket-options.md#srt_km_state)) +and hence dropped as not encrypted (undecrypted). #### pktSndFilterExtraTotal @@ -821,5 +823,5 @@ The ratio of unrecovered by the socket group packets `Dropped Packets Ratio` can ``` Dropped Packets Ratio = pktRcvDropTotal / pktSentUniqueTotal; in case both sender and receiver statistics is available -Dropped Packets Ratio = pktRcvDropTotal / (pktRecvUniqueTotal + pktRcvDropTotal); in case receiver only statistics is available -``` \ No newline at end of file +Dropped Packets Ratio = pktRcvDropTotal / (pktRecvUniqueTotal + pktRcvDropTotal); in case receiver only statistics is available +``` diff --git a/haicrypt/hcrypt.c b/haicrypt/hcrypt.c index 2568654b1..dc3f06801 100644 --- a/haicrypt/hcrypt.c +++ b/haicrypt/hcrypt.c @@ -320,6 +320,7 @@ int HaiCrypt_Clone(HaiCrypt_Handle hhcSrc, HaiCrypt_CryptoDir tx, HaiCrypt_Handl cryptoClone->ctx_pair[1].flags &= ~HCRYPT_CTX_F_ENCRYPT; memset(cryptoClone->ctx_pair[0].salt, 0, sizeof(cryptoClone->ctx_pair[0].salt)); cryptoClone->ctx_pair[0].salt_len = 0; + cryptoClone->ctx = &cryptoClone->ctx_pair[0]; } *phhc = (void *)cryptoClone; diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 7ec8ff570..9557c2c5c 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -657,6 +657,9 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, HLOGC(cnlog.Debug, log << "newConnection: mapping peer " << ns->m_PeerID << " to that socket (" << ns->m_SocketID << ")"); m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID); + + LOGC(cnlog.Note, log << "@" << ns->m_SocketID << " connection on listener @" << listen + << " (" << ns->m_SelfAddr.str() << ") from peer @" << ns->m_PeerID << " (" << peer.str() << ")"); } catch (...) { diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 1f46788aa..631c4aa8d 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -206,7 +206,7 @@ int CRcvBuffer::insert(CUnit* unit) return 0; } -int CRcvBuffer::dropUpTo(int32_t seqno) +std::pair CRcvBuffer::dropUpTo(int32_t seqno) { IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo); @@ -215,16 +215,23 @@ int CRcvBuffer::dropUpTo(int32_t seqno) if (len <= 0) { IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop."); - return 0; + return std::make_pair(0, 0); } m_iMaxPosOff -= len; if (m_iMaxPosOff < 0) m_iMaxPosOff = 0; - const int iDropCnt = len; + int iNumDropped = 0; // Number of dropped packets that were missing. + int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer. while (len > 0) { + // Note! Dropping a EntryState_Read must not be counted as a drop because it was read. + // Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier. + if (m_entries[m_iStartPos].status == EntryState_Avail) + ++iNumDiscarded; + else if (m_entries[m_iStartPos].status == EntryState_Empty) + ++iNumDropped; dropUnitInPos(m_iStartPos); m_entries[m_iStartPos].status = EntryState_Empty; SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty); @@ -239,14 +246,14 @@ int CRcvBuffer::dropUpTo(int32_t seqno) // If the nonread position is now behind the starting position, set it to the starting position and update. // Preceding packets were likely missing, and the non read position can probably be moved further now. - if (CSeqNo::seqcmp(m_iFirstNonreadPos, m_iStartPos) < 0) + if (!isInRange(m_iStartPos, m_iMaxPosOff, m_szSize, m_iFirstNonreadPos)) { m_iFirstNonreadPos = m_iStartPos; updateNonreadPos(); } if (!m_tsbpd.isEnabled() && m_bMessageAPI) updateFirstReadableOutOfOrder(); - return iDropCnt; + return std::make_pair(iNumDropped, iNumDiscarded); } int CRcvBuffer::dropAll() @@ -255,7 +262,8 @@ int CRcvBuffer::dropAll() return 0; const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff); - return dropUpTo(end_seqno); + const std::pair numDropped = dropUpTo(end_seqno); + return numDropped.first + numDropped.second; } int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting) diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index d4b50fab7..f783ac2a2 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -66,8 +66,8 @@ class CRcvBuffer /// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno). /// @param [in] seqno drop units up to this sequence number - /// @return number of dropped packets. - int dropUpTo(int32_t seqno); + /// @return number of dropped (missing) and discarded (available) packets as a pair(dropped, discarded). + std::pair dropUpTo(int32_t seqno); /// @brief Drop all the packets in the receiver buffer. /// The starting position and seqno are shifted right after the last packet in the buffer. @@ -200,6 +200,20 @@ class CRcvBuffer return (m_iMaxPosOff == 0); } + /// Returns the currently used number of cells, including + /// gaps with empty cells, or in other words, the distance + /// between the initial position and the youngest received packet. + size_t size() const + { + return m_iMaxPosOff; + } + + // Returns true if the buffer is full. Requires locking. + bool full() const + { + return size() == capacity(); + } + /// Return buffer capacity. /// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer". /// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously. @@ -239,9 +253,15 @@ class CRcvBuffer inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; } inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); } inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : int(m_szSize + pos2 - pos1); } + + /// @brief Compares the two positions in the receiver buffer relative to the starting position. + /// @param pos2 a position in the receiver buffer. + /// @param pos1 a position in the receiver buffer. + /// @return a positive value if pos2 is ahead of pos1; a negative value, if pos2 is behind pos1; otherwise returns 0. inline int cmpPos(int pos2, int pos1) const { - // XXX maybe not the best implementation, but this keeps up to the rule + // XXX maybe not the best implementation, but this keeps up to the rule. + // Maybe use m_iMaxPosOff to ensure a position is not behind the m_iStartPos. const int off1 = pos1 >= m_iStartPos ? pos1 - m_iStartPos : pos1 + (int)m_szSize - m_iStartPos; const int off2 = pos2 >= m_iStartPos ? pos2 - m_iStartPos : pos2 + (int)m_szSize - m_iStartPos; @@ -327,9 +347,8 @@ class CRcvBuffer EntryStatus status; }; - //static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; } - - FixedArray m_entries; + typedef FixedArray entries_t; + entries_t m_entries; const size_t m_szSize; // size of the array of units (buffer) CUnitQueue* m_pUnitQueue; // the shared unit queue diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 084cad670..98b9d1287 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -119,6 +119,7 @@ const int UDT::ERROR = srt::CUDT::ERROR; 2[15..0]: TsbPD delay [0..60000] msec */ +// IMPORTANT!!! This array must be ordered by value, because std::binary_search is performed on it! extern const SRT_SOCKOPT srt_post_opt_list [SRT_SOCKOPT_NPOST] = { SRTO_SNDSYN, SRTO_RCVSYN, @@ -127,11 +128,14 @@ extern const SRT_SOCKOPT srt_post_opt_list [SRT_SOCKOPT_NPOST] = { SRTO_RCVTIMEO, SRTO_MAXBW, SRTO_INPUTBW, - SRTO_MININPUTBW, SRTO_OHEADBW, SRTO_SNDDROPDELAY, SRTO_DRIFTTRACER, + SRTO_MININPUTBW, SRTO_LOSSMAXTTL +#ifdef ENABLE_MAXREXMITBW + ,SRTO_MAXREXMITBW +#endif }; const int32_t @@ -294,9 +298,10 @@ void srt::CUDT::construct() m_iPeerTsbPdDelay_ms = 0; m_bPeerTsbPd = false; m_bTsbPd = false; - m_bTsbPdAckWakeup = false; + m_bTsbPdNeedsWakeup = false; m_bGroupTsbPd = false; m_bPeerTLPktDrop = false; + m_bBufferWasFull = false; // Initilize mutex and condition variables. initSynch(); @@ -554,6 +559,15 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen) optlen = sizeof(int32_t); break; +#ifdef ENABLE_MAXREXMITBW + case SRTO_MAXREXMITBW: + if (size_t(optlen) < sizeof(m_config.llMaxRexmitBW)) + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + *(int64_t*)optval = m_config.llMaxRexmitBW; + optlen = sizeof(int64_t); + break; +#endif + case SRTO_STATE: *(int32_t *)optval = uglobal().getStatus(m_SocketID); optlen = sizeof(int32_t); @@ -2078,9 +2092,9 @@ int srt::CUDT::processSrtMsg_HSREQ(const uint32_t *srtdata, size_t bytelen, uint return SRT_CMD_NONE; } - LOGC(cnlog.Note, log << "HSREQ/rcv: cmd=" << SRT_CMD_HSREQ << "(HSREQ) len=" << bytelen - << hex << " vers=0x" << srtdata[SRT_HS_VERSION] << " opts=0x" << srtdata[SRT_HS_FLAGS] - << dec << " delay=" << SRT_HS_LATENCY_RCV::unwrap(srtdata[SRT_HS_LATENCY])); + LOGC(cnlog.Debug, log << "HSREQ/rcv: cmd=" << SRT_CMD_HSREQ << "(HSREQ) len=" << bytelen + << hex << " vers=0x" << srtdata[SRT_HS_VERSION] << " opts=0x" << srtdata[SRT_HS_FLAGS] + << dec << " delay=" << SRT_HS_LATENCY_RCV::unwrap(srtdata[SRT_HS_LATENCY])); m_uPeerSrtVersion = srtdata[SRT_HS_VERSION]; m_uPeerSrtFlags = srtdata[SRT_HS_FLAGS]; @@ -3530,7 +3544,6 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) m_iISN = m_ConnReq.m_iISN = forced_isn; setInitialSndSeq(m_iISN); - // Inform the server my configurations. CPacket reqpkt; reqpkt.setControl(UMSG_HANDSHAKE); @@ -4925,8 +4938,9 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, } */ - - LOGC(cnlog.Note, log << CONID() << "Connection established to: " << m_PeerAddr.str()); + + LOGC(cnlog.Note, log << CONID() << "Connection established from (" + << m_SourceAddr.str() << ") to peer @" << m_PeerID << " (" << m_PeerAddr.str() << ")"); return CONN_ACCEPT; } @@ -5361,7 +5375,7 @@ void * srt::CUDT::tsbpd(void* param) CUniqueSync recvdata_lcc (self->m_RecvLock, self->m_RecvDataCond); CSync tsbpd_cc(self->m_RcvTsbPdCond, recvdata_lcc.locker()); - self->m_bTsbPdAckWakeup = true; + self->m_bTsbPdNeedsWakeup = true; while (!self->m_bClosing) { steady_clock::time_point tsNextDelivery; // Next packet delivery time @@ -5381,6 +5395,21 @@ void * srt::CUDT::tsbpd(void* param) const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time); tsNextDelivery = info.tsbpd_time; +#if ENABLE_HEAVY_LOGGING + if (info.seqno == SRT_SEQNO_NONE) + { + HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: NO PACKETS"); + } + else + { + HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: %" + << info.seqno << " T=" << FormatTime(tsNextDelivery) + << " diff-now-playtime=" << FormatDuration(tnow - tsNextDelivery) + << " ready=" << is_time_to_deliver + << " ondrop=" << info.seq_gap); + } +#endif + if (!self->m_bTLPktDrop) { rxready = !info.seq_gap && is_time_to_deliver; @@ -5416,8 +5445,8 @@ void * srt::CUDT::tsbpd(void* param) if (rxready) { HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " - << (count_milliseconds(steady_clock::now() - info.tsbpd_time)) << "ms)"); + log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " + << FormatDuration(steady_clock::now() - info.tsbpd_time) << ")"); /* * There are packets ready to be delivered * signal a waiting "recv" call if there is any data available @@ -5480,6 +5509,8 @@ void * srt::CUDT::tsbpd(void* param) if (self->m_bClosing) break; + SRT_ATR_UNUSED bool bWokeUpOnSignal = true; + if (!is_zero(tsNextDelivery)) { IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow); @@ -5487,12 +5518,12 @@ void * srt::CUDT::tsbpd(void* param) * Buffer at head of queue is not ready to play. * Schedule wakeup when it will be. */ - self->m_bTsbPdAckWakeup = false; + self->m_bTsbPdNeedsWakeup = false; HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno - << " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms"); + log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno + << " T=" << FormatTime(tsNextDelivery) << " - waiting " << FormatDuration(timediff)); THREAD_PAUSED(); - tsbpd_cc.wait_until(tsNextDelivery); + bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery); THREAD_RESUMED(); } else @@ -5509,20 +5540,22 @@ void * srt::CUDT::tsbpd(void* param) * - Closing the connection */ HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); - self->m_bTsbPdAckWakeup = true; + self->m_bTsbPdNeedsWakeup = true; THREAD_PAUSED(); tsbpd_cc.wait(); THREAD_RESUMED(); } - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - " + << "NOW=" << FormatTime(steady_clock::now())); } THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; } -int srt::CUDT::rcvDropTooLateUpTo(int seqno) +int srt::CUDT::rcvDropTooLateUpTo(int seqno, DropReason reason) { // Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders. if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) @@ -5530,16 +5563,22 @@ int srt::CUDT::rcvDropTooLateUpTo(int seqno) dropFromLossLists(SRT_SEQNO_NONE, CSeqNo::decseq(seqno)); - const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno); - if (iDropCnt > 0) + const std::pair iDropDiscardedPkts = m_pRcvBuffer->dropUpTo(seqno); + const int iDropCnt = iDropDiscardedPkts.first; + const int iDiscardedCnt = iDropDiscardedPkts.second; + const int iDropCntTotal = iDropCnt + iDiscardedCnt; + + // In case of DROP_TOO_LATE discarded packets should also be counted because they are not read from another member socket. + const int iDropStatCnt = (reason == DROP_DISCARD) ? iDropCnt : iDropCntTotal; + if (iDropStatCnt > 0) { enterCS(m_StatsLock); // Estimate dropped bytes from average payload size. const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize(); - m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt)); + m_stats.rcvr.dropped.count(stats::BytesPackets(iDropStatCnt * avgpayloadsz, (uint32_t)iDropStatCnt)); leaveCS(m_StatsLock); } - return iDropCnt; + return iDropCntTotal; } void srt::CUDT::setInitialRcvSeq(int32_t isn) @@ -5609,6 +5648,14 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd return true; } +int srt::CUDT::getAuthTagSize() const +{ + if (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) + return HAICRYPT_AUTHTAG_MAX; + + return 0; +} + bool srt::CUDT::prepareBuffers(CUDTException* eout) { if (m_pSndBuffer) @@ -5620,7 +5667,14 @@ bool srt::CUDT::prepareBuffers(CUDTException* eout) try { // CryptoControl has to be initialized and in case of RESPONDER the KM REQ must be processed (interpretSrtHandshake(..)) for the crypto mode to be deduced. - const int authtag = (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) ? HAICRYPT_AUTHTAG_MAX : 0; + const int authtag = getAuthTagSize(); + + SRT_ASSERT(m_iMaxSRTPayloadSize != 0); + + HLOGC(rslog.Debug, log << CONID() << "Creating buffers: snd-plsize=" << m_iMaxSRTPayloadSize + << " snd-bufsize=" << 32 + << " authtag=" << authtag); + m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize, authtag); SRT_ASSERT(m_iPeerISN != -1); m_pRcvBuffer = new srt::CRcvBuffer(m_iPeerISN, m_config.iRcvBufSize, m_pRcvQueue->m_pUnitQueue, m_config.bMessageAPI); @@ -5746,6 +5800,16 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& throw CUDTException(MJ_SETUP, MN_REJECTED, 0); } +#if ENABLE_BONDING + // The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called. + // Keep the group alive for the lifetime of this function, + // and do it BEFORE acquiring m_ConnectionLock to avoid + // lock inversion. + // This will check if a socket belongs to a group and if so + // it will remember this group and keep it alive here. + CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); +#endif + if (!prepareBuffers(NULL)) { HLOGC(cnlog.Debug, @@ -6304,7 +6368,7 @@ int srt::CUDT::receiveBuffer(char *data, int len) throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); } - // Kick TsbPd thread to schedule next wakeup (if running) + // Kick TsbPd thread to schedule the next wakeup (if running) if (m_config.iRcvTimeOut < 0) { THREAD_PAUSED(); @@ -6415,7 +6479,7 @@ int srt::CUDT::sndDropTooLate() // If some packets were dropped update stats, socket state, loss list and the parent group if any. enterCS(m_StatsLock); - m_stats.sndr.dropped.count(dbytes);; + m_stats.sndr.dropped.count(stats::BytesPackets((uint64_t)dbytes, (uint32_t)dpkts)); leaveCS(m_StatsLock); IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck); @@ -6833,6 +6897,12 @@ bool srt::CUDT::isRcvBufferReadyNoLock() const return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); } +bool srt::CUDT::isRcvBufferFull() const +{ + ScopedLock lck(m_RcvBufferLock); + return m_pRcvBuffer->full(); +} + // int by_exception: accepts values of CUDTUnited::ErrorHandling: // - 0 - by return value // - 1 - by exception @@ -7614,12 +7684,13 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg) // - m_dPktSndPeriod // - m_dCWndSize m_tdSendInterval = microseconds_from((int64_t)m_CongCtl->pktSndPeriod_us()); - m_dCongestionWindow = m_CongCtl->cgWindowSize(); + const double cgwindow = m_CongCtl->cgWindowSize(); + m_dCongestionWindow = cgwindow; #if ENABLE_HEAVY_LOGGING HLOGC(rslog.Debug, - log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us (" - << "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow=" - << std::setprecision(3) << m_dCongestionWindow); + log << CONID() << "updateCC: updated values from congctl: interval=" << FormatDuration(m_tdSendInterval) + << " (cfg:" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow=" + << std::setprecision(3) << cgwindow); #endif } @@ -7697,40 +7768,6 @@ void srt::CUDT::releaseSynch() leaveCS(m_RecvLock); } - -#if ENABLE_BONDING -void srt::CUDT::dropToGroupRecvBase() -{ - int32_t group_recv_base = SRT_SEQNO_NONE; - if (m_parent->m_GroupOf) - { - // Check is first done before locking to avoid unnecessary - // mutex locking. The condition for this field is that it - // can be either never set, already reset, or ever set - // and possibly dangling. The re-check after lock eliminates - // the dangling case. - ScopedLock glock (uglobal().m_GlobControlLock); - - // Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock, - // but this is an intended order. - if (m_parent->m_GroupOf) - group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo(); - } - if (group_recv_base == SRT_SEQNO_NONE) - return; - - ScopedLock lck(m_RcvBufferLock); - int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base)); - if (cnt > 0) - { - HLOGC(grlog.Debug, - log << CONID() << "dropToGroupRecvBase: dropped " << cnt << " packets before ACK: group_recv_base=" - << group_recv_base << " m_iRcvLastAck=" << m_iRcvLastAck - << " m_iRcvCurrSeqNo=" << m_iRcvCurrSeqNo << " m_bTsbPd=" << m_bTsbPd); - } -} -#endif - namespace srt { #if ENABLE_HEAVY_LOGGING static void DebugAck(string hdr, int prev, int ack) @@ -7947,10 +7984,6 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) #endif string reason; // just for "a reason" of giving particular % for ACK -#if ENABLE_BONDING - dropToGroupRecvBase(); -#endif - // The TSBPD thread may change the first lost sequence record (TLPKTDROP). // To avoid it the m_RcvBufferLock has to be acquired. UniqueLock bufflock(m_RcvBufferLock); @@ -8058,7 +8091,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) /* Newly acknowledged data, signal TsbPD thread */ CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond); // m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread - if (m_bTsbPdAckWakeup) + if (m_bTsbPdNeedsWakeup) tslcc.notify_one(); } else @@ -8121,7 +8154,8 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) else if (!bNeedFullAck) { // Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?) - LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck); + LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr(" << reason << ") %" + << ack << " <% last %" << m_iRcvLastAck); return nbsent; } @@ -8612,16 +8646,15 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { - const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); #if ENABLE_BONDING - if (drift_updated && m_parent->m_GroupOf) - { - ScopedLock glock(uglobal().m_GlobControlLock); - if (m_parent->m_GroupOf) - { - m_parent->m_GroupOf->synchronizeDrift(this); - } - } + ScopedLock glock(uglobal().m_GlobControlLock); + const bool drift_updated = +#endif + m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); + +#if ENABLE_BONDING + if (drift_updated) + m_parent->m_GroupOf->synchronizeDrift(this); #endif } @@ -10043,7 +10076,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& } } } - else if (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) + else if (m_pCryptoControl && m_pCryptoControl->m_RcvKmState == SRT_KM_S_SECURED) { // Unencrypted packets are not allowed. const int iDropCnt = m_pRcvBuffer->dropMessage(u->m_Packet.getSeqNo(), u->m_Packet.getSeqNo(), SRT_MSGNO_NONE, CRcvBuffer::DROP_EXISTING); @@ -11181,7 +11214,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) } } } - LOGC(cnlog.Note, log << CONID() << "listen ret: " << hs.m_iReqType << " - " << RequestTypeStr(hs.m_iReqType)); + LOGC(cnlog.Debug, log << CONID() << "listen ret: " << hs.m_iReqType << " - " << RequestTypeStr(hs.m_iReqType)); return RejectReasonForURQ(hs.m_iReqType); } diff --git a/srtcore/core.h b/srtcore/core.h index 71c955c33..d865a48e1 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -116,7 +116,12 @@ enum AckDataItem }; const size_t ACKD_FIELD_SIZE = sizeof(int32_t); +#ifdef ENABLE_MAXREXMITBW +static const size_t SRT_SOCKOPT_NPOST = 13; +#else static const size_t SRT_SOCKOPT_NPOST = 12; +#endif + extern const SRT_SOCKOPT srt_post_opt_list []; enum GroupDataItem @@ -318,6 +323,7 @@ class CUDT #endif int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; } + SRT_ATTR_REQUIRES(m_RecvAckLock) int flowWindowSize() const { return m_iFlowWindowSize; } int32_t deliveryRate() const { return m_iDeliveryRate; } int bandwidth() const { return m_iBandwidth; } @@ -365,6 +371,7 @@ class CUDT /// Returns the number of packets in flight (sent, but not yet acknowledged). /// @returns The number of packets in flight belonging to the interval [0; ...) + SRT_ATTR_REQUIRES(m_RecvAckLock) int32_t getFlightSpan() const { return getFlightSpan(m_iSndLastAck, m_iSndCurrSeqNo); @@ -494,6 +501,7 @@ class CUDT /// Allocates sender and receiver buffers and loss lists. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) bool prepareBuffers(CUDTException* eout); + int getAuthTagSize() const; SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) EConnectStatus postConnect(const CPacket* response, bool rendezvous, CUDTException* eout) ATR_NOEXCEPT; @@ -668,6 +676,8 @@ class CUDT /// the receiver fresh loss list. void unlose(const CPacket& oldpacket); void dropFromLossLists(int32_t from, int32_t to); + + SRT_ATTR_REQUIRES(m_RecvAckLock) bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason); SRT_ATTR_EXCLUDES(m_ConnectionLock) @@ -723,14 +733,24 @@ class CUDT SRT_ATTR_REQUIRES(m_RcvBufferLock) bool isRcvBufferReadyNoLock() const; + SRT_ATTR_EXCLUDES(m_RcvBufferLock) + bool isRcvBufferFull() const; + // TSBPD thread main function. static void* tsbpd(void* param); + enum DropReason + { + DROP_TOO_LATE, //< Drop to keep up to the live pace (TLPKTDROP). + DROP_DISCARD //< Drop because another group member already provided these packets. + }; + /// Drop too late packets (receiver side). Update loss lists and ACK positions. /// The @a seqno packet itself is not dropped. /// @param seqno [in] The sequence number of the first packets following those to be dropped. + /// @param reason A reason for dropping (see @a DropReason). /// @return The number of packets dropped. - int rcvDropTooLateUpTo(int seqno); + int rcvDropTooLateUpTo(int seqno, DropReason reason = DROP_TOO_LATE); static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt); static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt); @@ -951,12 +971,11 @@ class CUDT sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock - bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent + bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change. sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining CallbackHolder m_cbAcceptHook; CallbackHolder m_cbConnectHook; - // FORWARDER public: static int installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq); @@ -1095,7 +1114,8 @@ class CUDT /// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy). int handleSocketPacketReception(const std::vector& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs); - /// Get the packet's TSBPD time. + /// Get the packet's TSBPD time - + /// the time when it is passed to the reading application. /// The @a grp passed by void* is not used yet /// and shall not be used when ENABLE_BONDING=0. time_point getPktTsbPdTime(void* grp, const CPacket& packet); @@ -1115,12 +1135,6 @@ class CUDT static void addLossRecord(std::vector& lossrecord, int32_t lo, int32_t hi); int32_t bake(const sockaddr_any& addr, int32_t previous_cookie = 0, int correction = 0); -#if ENABLE_BONDING - /// @brief Drop packets in the recv buffer behind group_recv_base. - /// Updates m_iRcvLastSkipAck if it's behind group_recv_base. - void dropToGroupRecvBase(); -#endif - void processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival); diff --git a/srtcore/fec.cpp b/srtcore/fec.cpp index a41e3a33b..76a47384a 100644 --- a/srtcore/fec.cpp +++ b/srtcore/fec.cpp @@ -598,6 +598,9 @@ void FECFilterBuiltin::ClipData(Group& g, uint16_t length_net, uint8_t kflg, g.flag_clip = g.flag_clip ^ kflg; g.timestamp_clip = g.timestamp_clip ^ timestamp_hw; + HLOGC(pflog.Debug, log << "FEC CLIP: data pkt.size=" << payload_size + << " to a clip buffer size=" << payloadSize()); + // Payload goes "as is". for (size_t i = 0; i < payload_size; ++i) { diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 001dd4802..94265f420 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -259,6 +259,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_uOPT_MinStabilityTimeout_us(1000 * CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS) // -1 = "undefined"; will become defined with first added socket , m_iMaxPayloadSize(-1) + , m_iAvgPayloadSize(-1) , m_bSynRecving(true) , m_bSynSending(true) , m_bTsbPd(true) @@ -845,18 +846,9 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) set_currentSchedSequence(core.ISN()); } - // XXX - // Might need further investigation as to whether this isn't - // wrong for some cases. By having this -1 here the value will be - // laziliy set from the first reading one. It is believed that - // it covers all possible scenarios, that is: - // - // - no readers - no problem! - // - have some readers and a new is attached - this is set already - // - connect multiple links, but none has read yet - you'll be the first. - // - // Previous implementation used setting to: core.m_iPeerISN - resetInitialRxSequence(); + // Only set if was not initialized to avoid problems on a running connection. + if (m_RcvBaseSeqNo == SRT_SEQNO_NONE) + m_RcvBaseSeqNo = CSeqNo::decseq(core.m_iPeerISN); // Get the latency (possibly fixed against the opposite side) // from the first socket (core.m_iTsbPdDelay_ms), @@ -2024,10 +2016,14 @@ vector CUDTGroup::recv_WaitForReadReady(const vector& } else { - // No read-readiness reported by epoll, but probably missed or not yet handled - // as the receiver buffer is read-ready. + // No read-readiness reported by epoll, but can be missed or not yet handled + // while the receiver buffer is in fact read-ready. ScopedLock lg(sock->core().m_RcvBufferLock); - if (sock->core().m_pRcvBuffer && sock->core().m_pRcvBuffer->isRcvDataReady()) + if (!sock->core().m_pRcvBuffer) + continue; + // Checking for the next packet in the RCV buffer is safer that isReadReady(tnow). + const CRcvBuffer::PacketInfo info = sock->core().m_pRcvBuffer->getFirstValidPacketInfo(); + if (info.seqno != SRT_SEQNO_NONE && !info.seq_gap) readReady.push_back(sock); } } @@ -2197,6 +2193,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } // Find the first readable packet among all member sockets. + steady_clock::time_point tnow = steady_clock::now(); CUDTSocket* socketToRead = NULL; CRcvBuffer::PacketInfo infoToRead = {-1, false, time_point()}; for (vector::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si) @@ -2217,7 +2214,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } const CRcvBuffer::PacketInfo info = - ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now()); + ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(tnow); if (info.seqno == SRT_SEQNO_NONE) { HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read."); @@ -2237,6 +2234,12 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { socketToRead = ps; infoToRead = info; + + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && ((CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) == 1)) + { + // We have the next packet. No need to check other read-ready sockets. + break; + } } } @@ -2285,6 +2288,20 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } fillGroupData((w_mc), w_mc); + // m_RcvBaseSeqNo is expected to be set to the PeerISN with the first connected member, + // so a packet drop at the start should also be detected by this condition. + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) + { + const int32_t iNumDropped = (CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1; + if (iNumDropped > 0) + { + m_stats.recvDrop.count(stats::BytesPackets(iNumDropped * static_cast(avgRcvPacketSize()), iNumDropped)); + LOGC(grlog.Warn, + log << "@" << m_GroupID << " GROUP RCV-DROPPED " << iNumDropped << " packet(s): seqno %" + << CSeqNo::incseq(m_RcvBaseSeqNo) << " to %" << CSeqNo::decseq(w_mc.pktseq)); + } + } + HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq); m_RcvBaseSeqNo = w_mc.pktseq; @@ -2300,7 +2317,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) ScopedLock lg(ps->core().m_RcvBufferLock); if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) { - const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); + const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD); if (cnt > 0) { HLOGC(grlog.Debug, diff --git a/srtcore/packetfilter.cpp b/srtcore/packetfilter.cpp index 37785f43a..9610f04e3 100644 --- a/srtcore/packetfilter.cpp +++ b/srtcore/packetfilter.cpp @@ -314,9 +314,15 @@ bool srt::PacketFilter::configure(CUDT* parent, CUnitQueue* uq, const std::strin init.socket_id = parent->socketID(); init.snd_isn = parent->sndSeqNo(); init.rcv_isn = parent->rcvSeqNo(); - init.payload_size = parent->OPT_PayloadSize(); + + // XXX This is a formula for a full "SRT payload" part that undergoes transmission, + // might be nice to have this formula as something more general. + init.payload_size = parent->OPT_PayloadSize() + parent->getAuthTagSize(); init.rcvbuf_size = parent->m_config.iRcvBufSize; + HLOGC(pflog.Debug, log << "PFILTER: @" << init.socket_id << " payload size=" + << init.payload_size << " rcvbuf size=" << init.rcvbuf_size); + // Found a filter, so call the creation function m_filter = selector->second->Create(init, m_provided, confstr); if (!m_filter) diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 863148b34..23c44fc01 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1092,8 +1092,8 @@ bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst, if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat) { HLOGC(cnlog.Debug, - log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0 - << " ms passed since last connection request."); + log << "RID:@" << i->m_iID << " " << FormatDuration(tsNow - tsLastReq) + << " passed since last connection request."); continue; } @@ -1407,7 +1407,7 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, ScopedLock cg(m_LSLock); if (m_pListener) { - LOGC(cnlog.Note, log << "PASSING request from: " << addr.str() << " to agent:" << m_pListener->socketID()); + LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << m_pListener->socketID()); listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet); // This function does return a code, but it's hard to say as to whether @@ -1426,8 +1426,8 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, if (have_listener) // That is, the above block with m_pListener->processConnectRequest was executed { - LOGC(cnlog.Note, - log << CONID() << "Listener managed the connection request from: " << addr.str() + LOGC(cnlog.Debug, + log << CONID() << "Listener got the connection request from: " << addr.str() << " result:" << RequestTypeStr(UDTRequestType(listener_ret))); return listener_ret == SRT_REJ_UNKNOWN ? CONN_CONTINUE : CONN_REJECT; } diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 31e05b205..82cae300f 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -654,7 +654,7 @@ class UniquePtr: public std::auto_ptr bool operator==(const element_type* two) const { return get() == two; } bool operator!=(const element_type* two) const { return get() != two; } - operator bool () { return 0!= get(); } + operator bool () const { return 0!= get(); } }; // A primitive one-argument versions of Sprint and Printable