From 39cf693928695dac83225faa9cd31cd4c98b8e18 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 4 Oct 2024 13:43:42 +0200 Subject: [PATCH] feat: add async exception tracking --- webrtc/datachannel.nim | 76 ++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/webrtc/datachannel.nim b/webrtc/datachannel.nim index 611bc49..61f7c4f 100644 --- a/webrtc/datachannel.nim +++ b/webrtc/datachannel.nim @@ -8,12 +8,8 @@ # those terms. import tables - -import chronos, - chronicles, - binary_serialization - -import sctp +import chronos, chronicles, binary_serialization +import errors, sctp/[sctp_transport, sctp_connection] export binary_serialization @@ -85,12 +81,12 @@ type conn*: SctpConn incomingStreams: AsyncQueue[DataChannelStream] -proc read*(stream: DataChannelStream): Future[seq[byte]] {.async.} = +proc read*(stream: DataChannelStream): Future[seq[byte]] {.async: (raises: [CancelledError]).} = let x = await stream.receivedData.popFirst() trace "read", length=x.len(), id=stream.id return x -proc write*(stream: DataChannelStream, buf: seq[byte]) {.async.} = +proc write*(stream: DataChannelStream, buf: seq[byte]) {.async: (raises: [CancelledError, WebRtcError]).} = trace "write", length=buf.len(), id=stream.id var sendInfo = SctpMessageParameters( @@ -110,7 +106,7 @@ proc write*(stream: DataChannelStream, buf: seq[byte]) {.async.} = else: await stream.conn.write(buf, sendInfo) -proc sendControlMessage(stream: DataChannelStream, msg: DataChannelMessage) {.async.} = +proc sendControlMessage(stream: DataChannelStream, msg: DataChannelMessage) {.async: (raises: [CancelledError, WebRtcError]).} = let encoded = Binary.encode(msg) sendInfo = SctpMessageParameters( @@ -168,49 +164,54 @@ proc openStream*( await stream.sendControlMessage(msg) return stream -proc handleData(conn: DataChannelConnection, msg: SctpMessage) = +proc handleData(conn: DataChannelConnection, msg: SctpMessage) {.async: (raises: [CancelledError, WebRtcError]).} = let streamId = msg.params.streamId trace "handle data message", streamId, ppid = msg.params.protocolId, data = msg.data - if streamId notin conn.streams: - raise newException(ValueError, "got data for unknown streamid") - - let stream = conn.streams[streamId] - - #TODO handle string vs binary - if msg.params.protocolId in [uint32(WebRtcStringEmpty), uint32(WebRtcBinaryEmpty)]: - # PPID indicate empty message - stream.receivedData.addLastNoWait(@[]) - else: - stream.receivedData.addLastNoWait(msg.data) - -proc handleControl(conn: DataChannelConnection, msg: SctpMessage) {.async.} = - let - decoded = Binary.decode(msg.data, DataChannelMessage) - streamId = msg.params.streamId + conn.streams.withValue(streamId, stream): + #TODO handle string vs binary + if msg.params.protocolId in [uint32(WebRtcStringEmpty), uint32(WebRtcBinaryEmpty)]: + # PPID indicate empty message + await stream.receivedData.addLast(@[]) + else: + await stream.receivedData.addLast(msg.data) + do: + raise newException(WebRtcError, "DataChannel - Got data for unknown StreamID") + +proc handleControl( + conn: DataChannelConnection, msg: SctpMessage + ) {.async: (raises: [CancelledError, WebRtcError]).} = + let decoded = + try: + Binary.decode(msg.data, DataChannelMessage) + except SerializationError as exc: + raise newException(WebRtcError, "DataChannel - " & exc.msg, exc) + let streamId = msg.params.streamId trace "handle control message", decoded, streamId = msg.params.streamId if decoded.messageType == Ack: - if streamId notin conn.streams: - raise newException(ValueError, "got ack for unknown streamid") - conn.streams[streamId].acked = true + conn.streams.withValue(streamId, stream): + if stream.acked == true: + trace "Received ACK twice on the same StreamID", streamId + stream.acked = true + do: + raise newException(WebRtcError, "DataChannel - Got ACK for unknown StreamID") elif decoded.messageType == Open: if streamId in conn.streams: - raise newException(ValueError, "got open for already existing streamid") - + raise newException(WebRtcError, "DataChannel - Got open for already existing StreamID") let stream = DataChannelStream( - id: streamId, conn: conn.conn, + id: streamId, + conn: conn.conn, reliability: decoded.openMessage.channelType, reliabilityParameter: decoded.openMessage.reliabilityParameter, receivedData: newAsyncQueue[seq[byte]]() ) conn.streams[streamId] = stream - conn.incomingStreams.addLastNoWait(stream) - + await conn.incomingStreams.addLast(stream) await stream.sendControlMessage(DataChannelMessage(messageType: Ack)) -proc readLoop(conn: DataChannelConnection) {.async.} = +proc readLoop(conn: DataChannelConnection) {.async: (raises: [CancelledError]).} = try: while true: let message = await conn.conn.read() @@ -219,12 +220,13 @@ proc readLoop(conn: DataChannelConnection) {.async.} = #TODO should we really await? await conn.handleControl(message) else: - conn.handleData(message) + await conn.handleData(message) except CatchableError as exc: discard -proc accept*(conn: DataChannelConnection): Future[DataChannelStream] {.async.} = +proc accept*(conn: DataChannelConnection): Future[DataChannelStream] + {.async: (raises: [CancelledError]).} = return await conn.incomingStreams.popFirst() proc new*(_: type DataChannelConnection, conn: SctpConn): DataChannelConnection =