Skip to content

Commit

Permalink
Process data channel messages sequentially (#1665)
Browse files Browse the repository at this point in the history
This prevents SCTP DataChannel messages from being processed out of order, causing warnings on channel initialization and potential issues with Last-N selection.
  • Loading branch information
bertm authored Jun 7, 2021
1 parent aa68343 commit b24f756
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,20 @@ class Endpoint @JvmOverloads constructor(
setErrorHandler(queueErrorCounter)
}

/**
* The queue which enforces sequential processing of incoming data channel messages
* to maintain processing order.
*/
private val incomingDataChannelMessagesQueue = PacketInfoQueue(
"${javaClass.simpleName}-incoming-data-channel-queue",
TaskPools.IO_POOL,
{ packetInfo ->
dataChannelHandler.consume(packetInfo)
true
},
TransportConfig.queueSize
)

private val bitrateController = BitrateController(
object : BitrateController.EventHandler {
override fun allocationChanged(allocation: BandwidthAllocation) {
Expand Down Expand Up @@ -565,7 +579,7 @@ class Endpoint @JvmOverloads constructor(
// holding a lock inside the SctpSocket which can cause a deadlock
// if two endpoints are trying to send datachannel messages to one
// another (with stats broadcasting it can happen often)
TaskPools.IO_POOL.execute { dataChannelHandler.processPacket(PacketInfo(dataChannelPacket)) }
incomingDataChannelMessagesQueue.add(PacketInfo(dataChannelPacket))
}
socket.listen()
sctpSocket = Optional.of(socket)
Expand Down Expand Up @@ -1099,7 +1113,7 @@ class Endpoint @JvmOverloads constructor(
private var dataChannelStack: DataChannelStack? = null
private val cachedDataChannelPackets = LinkedBlockingQueue<PacketInfo>()

override fun consume(packetInfo: PacketInfo) {
public override fun consume(packetInfo: PacketInfo) {
synchronized(dataChannelStackLock) {
when (val packet = packetInfo.packet) {
is DataChannelPacket -> {
Expand Down

0 comments on commit b24f756

Please sign in to comment.