Skip to content

Commit

Permalink
feat: add async exception tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
lchenut committed Oct 4, 2024
1 parent 31f6a61 commit 39cf693
Showing 1 changed file with 39 additions and 37 deletions.
76 changes: 39 additions & 37 deletions webrtc/datachannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
# those terms.

import tables

import chronos,
chronicles,
binary_serialization

import sctp
import chronos, chronicles, binary_serialization
import errors, sctp/[sctp_transport, sctp_connection]

export binary_serialization

Expand Down Expand Up @@ -85,12 +81,12 @@ type
conn*: SctpConn
incomingStreams: AsyncQueue[DataChannelStream]

proc read*(stream: DataChannelStream): Future[seq[byte]] {.async.} =
proc read*(stream: DataChannelStream): Future[seq[byte]] {.async: (raises: [CancelledError]).} =
let x = await stream.receivedData.popFirst()
trace "read", length=x.len(), id=stream.id
return x

proc write*(stream: DataChannelStream, buf: seq[byte]) {.async.} =
proc write*(stream: DataChannelStream, buf: seq[byte]) {.async: (raises: [CancelledError, WebRtcError]).} =
trace "write", length=buf.len(), id=stream.id
var
sendInfo = SctpMessageParameters(
Expand All @@ -110,7 +106,7 @@ proc write*(stream: DataChannelStream, buf: seq[byte]) {.async.} =
else:
await stream.conn.write(buf, sendInfo)

proc sendControlMessage(stream: DataChannelStream, msg: DataChannelMessage) {.async.} =
proc sendControlMessage(stream: DataChannelStream, msg: DataChannelMessage) {.async: (raises: [CancelledError, WebRtcError]).} =
let
encoded = Binary.encode(msg)
sendInfo = SctpMessageParameters(
Expand Down Expand Up @@ -168,49 +164,54 @@ proc openStream*(
await stream.sendControlMessage(msg)
return stream

proc handleData(conn: DataChannelConnection, msg: SctpMessage) =
proc handleData(conn: DataChannelConnection, msg: SctpMessage) {.async: (raises: [CancelledError, WebRtcError]).} =
let streamId = msg.params.streamId
trace "handle data message", streamId, ppid = msg.params.protocolId, data = msg.data

if streamId notin conn.streams:
raise newException(ValueError, "got data for unknown streamid")

let stream = conn.streams[streamId]

#TODO handle string vs binary
if msg.params.protocolId in [uint32(WebRtcStringEmpty), uint32(WebRtcBinaryEmpty)]:
# PPID indicate empty message
stream.receivedData.addLastNoWait(@[])
else:
stream.receivedData.addLastNoWait(msg.data)

proc handleControl(conn: DataChannelConnection, msg: SctpMessage) {.async.} =
let
decoded = Binary.decode(msg.data, DataChannelMessage)
streamId = msg.params.streamId
conn.streams.withValue(streamId, stream):
#TODO handle string vs binary
if msg.params.protocolId in [uint32(WebRtcStringEmpty), uint32(WebRtcBinaryEmpty)]:
# PPID indicate empty message
await stream.receivedData.addLast(@[])
else:
await stream.receivedData.addLast(msg.data)
do:
raise newException(WebRtcError, "DataChannel - Got data for unknown StreamID")

proc handleControl(
conn: DataChannelConnection, msg: SctpMessage
) {.async: (raises: [CancelledError, WebRtcError]).} =
let decoded =
try:
Binary.decode(msg.data, DataChannelMessage)
except SerializationError as exc:
raise newException(WebRtcError, "DataChannel - " & exc.msg, exc)
let streamId = msg.params.streamId

trace "handle control message", decoded, streamId = msg.params.streamId
if decoded.messageType == Ack:
if streamId notin conn.streams:
raise newException(ValueError, "got ack for unknown streamid")
conn.streams[streamId].acked = true
conn.streams.withValue(streamId, stream):
if stream.acked == true:
trace "Received ACK twice on the same StreamID", streamId
stream.acked = true
do:
raise newException(WebRtcError, "DataChannel - Got ACK for unknown StreamID")
elif decoded.messageType == Open:
if streamId in conn.streams:
raise newException(ValueError, "got open for already existing streamid")

raise newException(WebRtcError, "DataChannel - Got open for already existing StreamID")
let stream = DataChannelStream(
id: streamId, conn: conn.conn,
id: streamId,
conn: conn.conn,
reliability: decoded.openMessage.channelType,
reliabilityParameter: decoded.openMessage.reliabilityParameter,
receivedData: newAsyncQueue[seq[byte]]()
)

conn.streams[streamId] = stream
conn.incomingStreams.addLastNoWait(stream)

await conn.incomingStreams.addLast(stream)
await stream.sendControlMessage(DataChannelMessage(messageType: Ack))

proc readLoop(conn: DataChannelConnection) {.async.} =
proc readLoop(conn: DataChannelConnection) {.async: (raises: [CancelledError]).} =
try:
while true:
let message = await conn.conn.read()
Expand All @@ -219,12 +220,13 @@ proc readLoop(conn: DataChannelConnection) {.async.} =
#TODO should we really await?
await conn.handleControl(message)
else:
conn.handleData(message)
await conn.handleData(message)

except CatchableError as exc:
discard

proc accept*(conn: DataChannelConnection): Future[DataChannelStream] {.async.} =
proc accept*(conn: DataChannelConnection): Future[DataChannelStream]
{.async: (raises: [CancelledError]).} =
return await conn.incomingStreams.popFirst()

proc new*(_: type DataChannelConnection, conn: SctpConn): DataChannelConnection =
Expand Down

0 comments on commit 39cf693

Please sign in to comment.