Skip to content

Commit

Permalink
feat: add close stream management
Browse files Browse the repository at this point in the history
  • Loading branch information
lchenut committed Oct 11, 2024
1 parent d0cff33 commit 8aa09b7
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions webrtc/datachannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,32 @@ proc ordered(t: DataChannelType): bool =
# -- DataChannelStream --

type
#TODO handle closing
DataChannelOnClose* = proc() {.raises: [], gcsafe.}
DataChannelStream* = ref object
id: uint16
conn: SctpConn
reliability: DataChannelType
reliabilityParameter: uint32
receivedData: AsyncQueue[seq[byte]]
acked: bool
onClose: seq[DataChannelOnClose]

#TODO handle closing
DataChannelConnection* = ref object
readLoopFut: Future[void]
streams: Table[uint16, DataChannelStream]
streamId: uint16
conn*: SctpConn
incomingStreams: AsyncQueue[DataChannelStream]

proc addOnClose*(stream: DataChannelStream, onCloseProc: DataChannelOnClose) =
stream.onClose.add(onCloseProc)

proc read*(
stream: DataChannelStream
): Future[seq[byte]] {.async: (raises: [CancelledError]).} =
let x = await stream.receivedData.popFirst()
trace "read", length = x.len(), id = stream.id
return x
let data = await stream.receivedData.popFirst()
trace "read", length = data.len(), id = stream.id
return data

proc write*(
stream: DataChannelStream, buf: seq[byte]
Expand All @@ -99,7 +102,6 @@ proc write*(

if stream.acked:
sendInfo.unordered = not stream.reliability.ordered
#TODO add reliability params

if buf.len == 0:
trace "Datachannel write empty"
Expand All @@ -117,11 +119,13 @@ proc sendControlMessage(
streamId: stream.id, endOfRecord: true, protocolId: uint32(WebRtcDcep)
)
trace "send control message", msg

await stream.conn.write(encoded, sendInfo)

proc closeStream*(stream: DataChannelStream) =
stream.conn.closeChannel(stream.id)
for onCloseProc in stream.onClose:
onCloseProc()
stream.onClose = @[]

proc openStream*(
conn: DataChannelConnection,
Expand Down Expand Up @@ -152,7 +156,10 @@ proc openStream*(
receivedData: newAsyncQueue[seq[byte]](),
)

proc cleanup() =
conn.streams.del(streamId)
conn.streams[streamId] = stream
stream.addOnClose(cleanup)

let msg = DataChannelMessage(
messageType: Open,
Expand All @@ -170,7 +177,6 @@ proc handleData(
trace "handle data message", streamId, ppid = msg.params.protocolId, data = msg.data

conn.streams.withValue(streamId, stream):
#TODO handle string vs binary
if msg.params.protocolId in [uint32(WebRtcStringEmpty), uint32(WebRtcBinaryEmpty)]:
# PPID indicate empty message
await stream.receivedData.addLast(@[])
Expand Down Expand Up @@ -218,9 +224,7 @@ proc readLoop(conn: DataChannelConnection) {.async: (raises: [CancelledError]).}
try:
while true:
let message = await conn.conn.read()
# TODO: check the protocolId
if message.params.protocolId == uint32(WebRtcDcep):
#TODO should we really await?
await conn.handleControl(message)
else:
await conn.handleData(message)
Expand Down

0 comments on commit 8aa09b7

Please sign in to comment.