Skip to content

Commit

Permalink
add webrtc client
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahad-10 committed Dec 11, 2024
1 parent 7f82aeb commit 57c4bc6
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 1 deletion.
5 changes: 4 additions & 1 deletion app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {

android {
namespace = "io.xconn.wampwebrtc"
compileSdk = 34
compileSdk = 35

defaultConfig {
applicationId = "io.xconn.wampwebrtc"
Expand Down Expand Up @@ -42,4 +42,7 @@ dependencies {
testImplementation(libs.junit)
androidTestImplementation(libs.androidx.junit)
androidTestImplementation(libs.androidx.espresso.core)
implementation("io.xconn:xconn:0.1.0-alpha.4")
implementation("io.xconn:wampproto:0.1.1")
implementation("io.getstream:stream-webrtc-android:1.2.2")
}
1 change: 1 addition & 0 deletions app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools">

<uses-permission android:name="android.permission.INTERNET"/>
<application
android:allowBackup="true"
android:dataExtractionRules="@xml/data_extraction_rules"
Expand Down
37 changes: 37 additions & 0 deletions app/src/main/java/io/xconn/wampwebrtc/Helpers.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.xconn.wampwebrtc

import io.xconn.wampproto.Joiner
import io.xconn.wampproto.serializers.Serializer
import org.json.JSONObject

fun convertJsonToMap(jsonString: String): Map<String, Any> {
val jsonObject = JSONObject(jsonString)
val map = mutableMapOf<String, Any>()

jsonObject.keys().forEach { key ->
val value = jsonObject.get(key)
map[key] = value
}

return map
}

suspend fun join(peer: Peer, realm: String, serializer: Serializer): PeerBaseSession {
val joiner = Joiner(realm, serializer)
val hello = joiner.sendHello()

peer.send(hello as ByteArray)

while (true) {
val msg = peer.receive()
val toSend = joiner.receive(msg)

if (toSend == null) {
val sessionDetails = joiner.getSessionDetails()
val base = PeerBaseSession(peer, sessionDetails, serializer)
return base
}

peer.send(toSend as ByteArray)
}
}
35 changes: 35 additions & 0 deletions app/src/main/java/io/xconn/wampwebrtc/MessageAssembler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.xconn.wampwebrtc

import java.nio.ByteBuffer

class MessageAssembler {
private val builder = ByteBuffer.allocate(1024 * 1024) // Allocate 1MB buffer, adjust as needed

fun feed(data: ByteArray): ByteArray? {
builder.put(data, 1, data.size - 1)
val isFinal = data[0]
return if (isFinal == 1.toByte()) {
val result = ByteArray(builder.position())
builder.flip()
builder.get(result)
builder.clear()
result
} else {
null
}
}

fun chunkMessage(message: ByteArray): Sequence<ByteArray> = sequence {
val chunkSize = 16 * 1024 - 1 // 16KB - 1 byte for metadata
val totalChunks = (message.size + chunkSize - 1) / chunkSize

for (i in 0 until totalChunks) {
val start = i * chunkSize
val end = if (i == totalChunks - 1) message.size else start + chunkSize
val chunk = message.copyOfRange(start, end)

val isFinal = if (i == totalChunks - 1) 1.toByte() else 0.toByte()
yield(byteArrayOf(isFinal) + chunk)
}
}
}
121 changes: 121 additions & 0 deletions app/src/main/java/io/xconn/wampwebrtc/Offerer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.xconn.wampwebrtc

import android.content.Context
import org.webrtc.*
import java.util.concurrent.LinkedBlockingDeque
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine

class Offerer(
context: Context,
private val queue: LinkedBlockingDeque<ByteArray>,
private val signalIceCandidate: (IceCandidate) -> Unit,
) {
init {
val options = PeerConnectionFactory.InitializationOptions.builder(context)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)
}

var peerConnection: PeerConnection? = null
var dataChannel: DataChannel? = null
val peerConnectionFactory = PeerConnectionFactory.builder().createPeerConnectionFactory()
private var assembler = MessageAssembler()

suspend fun createOffer(offerConfig: OfferConfig): SessionDescription? {
val configuration = PeerConnection.RTCConfiguration(offerConfig.iceServers)

peerConnection = peerConnectionFactory.createPeerConnection(
configuration,
object : PeerConnection.Observer {
override fun onIceCandidate(candidate: IceCandidate?) {
candidate?.let {
signalIceCandidate(it)
peerConnection?.addIceCandidate(it)
}
}

override fun onDataChannel(channel: DataChannel?) {
channel?.registerObserver(object : DataChannel.Observer {
override fun onMessage(buffer: DataChannel.Buffer?) {}

override fun onBufferedAmountChange(p0: Long) {}
override fun onStateChange() {}
})
}

override fun onSignalingChange(p0: PeerConnection.SignalingState?) {}
override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) {}
override fun onIceConnectionReceivingChange(p0: Boolean) {}
override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {}
override fun onAddStream(p0: MediaStream?) {}
override fun onRemoveStream(p0: MediaStream?) {}
override fun onRenegotiationNeeded() {}
override fun onIceCandidatesRemoved(p0: Array<out IceCandidate>?) {}
})

// Create and set up the data channel
val conf = DataChannel.Init().apply {
id = offerConfig.id
ordered = offerConfig.ordered
protocol = offerConfig.protocol
}
dataChannel = peerConnection?.createDataChannel("wamp", conf)

return suspendCoroutine { continuation ->
peerConnection?.createOffer(object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {
peerConnection?.setLocalDescription(object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {}
override fun onSetSuccess() {
continuation.resume(description)
}
override fun onCreateFailure(p0: String?) {}
override fun onSetFailure(p0: String?) {}

}, description)
}

override fun onSetSuccess() {}
override fun onCreateFailure(p0: String?) {}
override fun onSetFailure(p0: String?) {}
}, MediaConstraints())
}
}

suspend fun waitForDataChannelOpen(): Unit = suspendCoroutine { continuation ->
dataChannel?.registerObserver(object : DataChannel.Observer {
override fun onStateChange() {
if (dataChannel?.state() == DataChannel.State.OPEN) {
continuation.resume(Unit)
}
}

override fun onBufferedAmountChange(p0: Long) {}
override fun onMessage(buffer: DataChannel.Buffer?) {
buffer?.data?.let {
val data = ByteArray(it.remaining())
it.get(data)

val message = assembler.feed(data)
if (message != null) {
queue.put(message)
}
}
}
})
}

fun setRemoteDescription(sessionDescription: SessionDescription) {
peerConnection?.setRemoteDescription(object : SdpObserver {
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onSetSuccess() {}
override fun onCreateFailure(p0: String?) {}
override fun onSetFailure(p0: String?) {}
}, sessionDescription)
}

fun addIceCandidate(candidate: IceCandidate) {
peerConnection?.addIceCandidate(candidate)
}
}
71 changes: 71 additions & 0 deletions app/src/main/java/io/xconn/wampwebrtc/Types.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.xconn.wampwebrtc

import io.xconn.wampproto.SessionDetails
import io.xconn.wampproto.messages.Message
import io.xconn.wampproto.serializers.Serializer
import io.xconn.xconn.IBaseSession
import org.webrtc.DataChannel
import org.webrtc.PeerConnection
import org.webrtc.PeerConnection.IceServer

interface Peer {
suspend fun send(data: ByteArray)

suspend fun sendMessage(message: Message)

suspend fun receive(): Any

suspend fun receiveMessage(): Message

fun close()
}

class PeerBaseSession(
private val peer: Peer,
private val sessionDetails: SessionDetails,
private val serializer: Serializer,
) : IBaseSession {
override fun id(): Long = sessionDetails.sessionID

override fun realm(): String = sessionDetails.realm

override fun authid(): String = sessionDetails.authid

override fun authrole(): String = sessionDetails.authrole

override fun serializer(): Serializer = serializer

override suspend fun send(data: Any) {
peer.send(data as ByteArray)
}

override suspend fun receive(): Any = peer.receive()

override suspend fun sendMessage(msg: Message) {
peer.sendMessage(msg)
}

override suspend fun receiveMessage(): Message = peer.receiveMessage()

override suspend fun close() = peer.close()
}

data class ClientConfig(
val url: String,
val realm: String,
val procedureWebRTCOffer: String,
val topicAnswererOnCandidate: String,
val serializer: Serializer,
val subProtocol: String,
val iceServers: List<IceServer>,
)

data class OfferConfig(
val protocol: String,
val iceServers: List<IceServer>,
val ordered: Boolean,
val id: Int,
val topicAnswererOnCandidate: String,
)

data class WebRTCSession(val connection: PeerConnection, val channel: DataChannel)
19 changes: 19 additions & 0 deletions app/src/main/java/io/xconn/wampwebrtc/WAMPSession.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.xconn.wampwebrtc

import android.content.Context
import io.xconn.xconn.Session
import java.util.concurrent.LinkedBlockingDeque

class WAMPSession(private val context: Context) {

suspend fun connect(config: ClientConfig): Session {
val queue = LinkedBlockingDeque<ByteArray>()
val webRTCConnection = WebRTC(context, queue)
val webRTCSession = webRTCConnection.connect(config)

val peer = WebRTCPeer(webRTCSession.channel, config.serializer, queue)
val baseSession = join(peer, config.realm, config.serializer)

return Session(baseSession)
}
}
Loading

0 comments on commit 57c4bc6

Please sign in to comment.