diff --git a/webrtc/datachannel.nim b/webrtc/datachannel.nim index e4061de..24fe8d2 100644 --- a/webrtc/datachannel.nim +++ b/webrtc/datachannel.nim @@ -65,7 +65,7 @@ proc ordered(t: DataChannelType): bool = # -- DataChannelStream -- type - #TODO handle closing + DataChannelOnClose* = proc() {.raises: [], gcsafe.} DataChannelStream* = ref object id: uint16 conn: SctpConn @@ -73,8 +73,8 @@ type reliabilityParameter: uint32 receivedData: AsyncQueue[seq[byte]] acked: bool + onClose: seq[DataChannelOnClose] - #TODO handle closing DataChannelConnection* = ref object readLoopFut: Future[void] streams: Table[uint16, DataChannelStream] @@ -82,12 +82,15 @@ type conn*: SctpConn incomingStreams: AsyncQueue[DataChannelStream] +proc addOnClose*(stream: DataChannelStream, onCloseProc: DataChannelOnClose) = + stream.onClose.add(onCloseProc) + proc read*( stream: DataChannelStream ): Future[seq[byte]] {.async: (raises: [CancelledError]).} = - let x = await stream.receivedData.popFirst() - trace "read", length = x.len(), id = stream.id - return x + let data = await stream.receivedData.popFirst() + trace "read", length = data.len(), id = stream.id + return data proc write*( stream: DataChannelStream, buf: seq[byte] @@ -99,7 +102,6 @@ proc write*( if stream.acked: sendInfo.unordered = not stream.reliability.ordered - #TODO add reliability params if buf.len == 0: trace "Datachannel write empty" @@ -117,11 +119,13 @@ proc sendControlMessage( streamId: stream.id, endOfRecord: true, protocolId: uint32(WebRtcDcep) ) trace "send control message", msg - await stream.conn.write(encoded, sendInfo) proc closeStream*(stream: DataChannelStream) = stream.conn.closeChannel(stream.id) + for onCloseProc in stream.onClose: + onCloseProc() + stream.onClose = @[] proc openStream*( conn: DataChannelConnection, @@ -152,7 +156,10 @@ proc openStream*( receivedData: newAsyncQueue[seq[byte]](), ) + proc cleanup() = + conn.streams.del(streamId) conn.streams[streamId] = stream + stream.addOnClose(cleanup) let msg = DataChannelMessage( messageType: Open, @@ -170,7 +177,6 @@ proc handleData( trace "handle data message", streamId, ppid = msg.params.protocolId, data = msg.data conn.streams.withValue(streamId, stream): - #TODO handle string vs binary if msg.params.protocolId in [uint32(WebRtcStringEmpty), uint32(WebRtcBinaryEmpty)]: # PPID indicate empty message await stream.receivedData.addLast(@[]) @@ -218,9 +224,7 @@ proc readLoop(conn: DataChannelConnection) {.async: (raises: [CancelledError]).} try: while true: let message = await conn.conn.read() - # TODO: check the protocolId if message.params.protocolId == uint32(WebRtcDcep): - #TODO should we really await? await conn.handleControl(message) else: await conn.handleData(message)