diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt index 152a1587e6..079001f481 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt @@ -78,7 +78,7 @@ import java.util.concurrent.ScheduledExecutorService class RtpReceiverImpl @JvmOverloads constructor( val id: String, /** - * A function to be used when these receiver wants to send RTCP packets to the + * A function to be used when the receiver wants to send RTCP packets to the * participant it's receiving data from (NACK packets, for example) */ private val rtcpSender: (RtcpPacket) -> Unit = {}, diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt index 639a4373ab..241fbde39e 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt @@ -48,5 +48,10 @@ abstract class RtpSender : abstract fun isFeatureEnabled(feature: Features): Boolean abstract fun tearDown() + /** + * An optional function to be executed for each RTP packet, as the first step of the send pipeline. + */ + var preProcesor: ((PacketInfo) -> PacketInfo?)? = null + abstract val bandwidthEstimator: BandwidthEstimator } diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt index 3c8272d11f..db2ae9d031 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt @@ -41,6 +41,7 @@ import org.jitsi.nlj.transform.node.PacketStreamStatsNode import org.jitsi.nlj.transform.node.SrtcpEncryptNode import org.jitsi.nlj.transform.node.SrtpEncryptNode import org.jitsi.nlj.transform.node.ToggleablePcapWriter +import org.jitsi.nlj.transform.node.TransformerNode import org.jitsi.nlj.transform.node.outgoing.AbsSendTime import org.jitsi.nlj.transform.node.outgoing.HeaderExtEncoder import org.jitsi.nlj.transform.node.outgoing.HeaderExtStripper @@ -140,6 +141,12 @@ class RtpSenderImpl( incomingPacketQueue.setErrorHandler(queueErrorCounter) outgoingRtpRoot = pipeline { + node(object : TransformerNode("Pre-processor") { + override fun transform(packetInfo: PacketInfo): PacketInfo? { + return preProcesor?.invoke(packetInfo) ?: packetInfo + } + override fun trace(f: () -> Unit) {} + }) node(AudioRedHandler(streamInformationStore, logger)) node(HeaderExtStripper(streamInformationStore)) node(outgoingPacketCache) diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt index d2302cf63f..8c4b468490 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt @@ -108,7 +108,7 @@ class Transceiver( */ fun isReceivingVideo(): Boolean = rtpReceiver.isReceivingVideo() - private val rtpSender: RtpSender = RtpSenderImpl( + val rtpSender: RtpSender = RtpSenderImpl( id, rtcpEventNotifier, senderExecutor, diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index 9ce14e42f4..1b6a3c7bab 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -297,6 +297,42 @@ class Endpoint @JvmOverloads constructor( addEndpointConnectionStatsListener(rttListener) setLocalSsrc(MediaType.AUDIO, conference.localAudioSsrc) setLocalSsrc(MediaType.VIDEO, conference.localVideoSsrc) + rtpSender.preProcesor = { packetInfo -> preProcess(packetInfo) } + } + + /** + * Perform processing of the packet before it goes through the rest of the [transceiver] send pipeline: + * 1. Update the bitrate controller state and apply the source projection logic + * 2. Perform SSRC re-writing if [doSsrcRewriting] is set. + */ + private fun preProcess(packetInfo: PacketInfo): PacketInfo? { + when (val packet = packetInfo.packet) { + is VideoRtpPacket -> { + if (!bitrateController.transformRtp(packetInfo)) { + logger.warn("Dropping a packet which was supposed to be accepted:$packet") + return null + } + // The original packet was transformed in place. + if (doSsrcRewriting) { + val start = packet !is ParsedVideoPacket || (packet.isKeyframe && packet.isStartOfFrame) + if (!videoSsrcs.rewriteRtp(packet, start)) { + return null + } + } + } + is AudioRtpPacket -> if (doSsrcRewriting) audioSsrcs.rewriteRtp(packet) + is RtcpSrPacket -> { + // Allow the BC to update the timestamp (in place). + bitrateController.transformRtcp(packet) + if (doSsrcRewriting) { + // Just check both tables instead of looking up the type first. + if (!videoSsrcs.rewriteRtcp(packet) && !audioSsrcs.rewriteRtcp(packet)) { + return null + } + } + } + } + return packetInfo } private val bandwidthProbing = BandwidthProbing( @@ -890,40 +926,7 @@ class Endpoint @JvmOverloads constructor( } } - override fun send(packetInfo: PacketInfo) { - when (val packet = packetInfo.packet) { - is VideoRtpPacket -> { - if (bitrateController.transformRtp(packetInfo)) { - // The original packet was transformed in place. - if (doSsrcRewriting) { - val start = packet !is ParsedVideoPacket || (packet.isKeyframe && packet.isStartOfFrame) - if (!videoSsrcs.rewriteRtp(packet, start)) { - return - } - } - transceiver.sendPacket(packetInfo) - } else { - logger.warn("Dropping a packet which was supposed to be accepted:$packet") - } - return - } - is AudioRtpPacket -> if (doSsrcRewriting) audioSsrcs.rewriteRtp(packet) - is RtcpSrPacket -> { - // Allow the BC to update the timestamp (in place). - bitrateController.transformRtcp(packet) - if (doSsrcRewriting) { - // Just check both tables instead of looking up the type first. - if (!videoSsrcs.rewriteRtcp(packet) && !audioSsrcs.rewriteRtcp(packet)) { - return - } - } - logger.trace { - "relaying an sr from ssrc=${packet.senderSsrc}, timestamp=${packet.senderInfo.rtpTimestamp}" - } - } - } - transceiver.sendPacket(packetInfo) - } + override fun send(packetInfo: PacketInfo) = transceiver.sendPacket(packetInfo) /** * To find out whether the endpoint should be expired, we check the activity timestamps from the transceiver. diff --git a/resources/analyze-timeline2.pl b/resources/analyze-timeline2.pl index a4b1df63e6..9b4987b306 100755 --- a/resources/analyze-timeline2.pl +++ b/resources/analyze-timeline2.pl @@ -25,9 +25,6 @@ the clone on the sender queue (see Conference#sendOut). This workload scales with the number of local endpoits and relays, and in a large conference is the most computationally expensive. -Note that currently Endpoint#send(PacketInfo) contains code for SSRC rewriting and a transformation from the BitrateController. -This can be offloaded to the sender queue and we plan to do so soon. - 4.Sender Queue: From the time the Receiver Pipeline thread places the packet on the Sender Queue, to the time another CPU thread removes it from the queue and starts executing the Sender Pipeline.