From 57c4bc6f2d39fea592b2577920529354b5d830a1 Mon Sep 17 00:00:00 2001 From: Mahad Date: Wed, 11 Dec 2024 18:28:54 +0500 Subject: [PATCH] add webrtc client --- app/build.gradle.kts | 5 +- app/src/main/AndroidManifest.xml | 1 + .../main/java/io/xconn/wampwebrtc/Helpers.kt | 37 ++++++ .../io/xconn/wampwebrtc/MessageAssembler.kt | 35 +++++ .../main/java/io/xconn/wampwebrtc/Offerer.kt | 121 ++++++++++++++++++ .../main/java/io/xconn/wampwebrtc/Types.kt | 71 ++++++++++ .../java/io/xconn/wampwebrtc/WAMPSession.kt | 19 +++ .../main/java/io/xconn/wampwebrtc/WebRTC.kt | 100 +++++++++++++++ .../java/io/xconn/wampwebrtc/WebRTCPeer.kt | 39 ++++++ 9 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 app/src/main/java/io/xconn/wampwebrtc/Helpers.kt create mode 100644 app/src/main/java/io/xconn/wampwebrtc/MessageAssembler.kt create mode 100644 app/src/main/java/io/xconn/wampwebrtc/Offerer.kt create mode 100644 app/src/main/java/io/xconn/wampwebrtc/Types.kt create mode 100644 app/src/main/java/io/xconn/wampwebrtc/WAMPSession.kt create mode 100644 app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt create mode 100644 app/src/main/java/io/xconn/wampwebrtc/WebRTCPeer.kt diff --git a/app/build.gradle.kts b/app/build.gradle.kts index d956ec2..c0f3f1b 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -5,7 +5,7 @@ plugins { android { namespace = "io.xconn.wampwebrtc" - compileSdk = 34 + compileSdk = 35 defaultConfig { applicationId = "io.xconn.wampwebrtc" @@ -42,4 +42,7 @@ dependencies { testImplementation(libs.junit) androidTestImplementation(libs.androidx.junit) androidTestImplementation(libs.androidx.espresso.core) + implementation("io.xconn:xconn:0.1.0-alpha.4") + implementation("io.xconn:wampproto:0.1.1") + implementation("io.getstream:stream-webrtc-android:1.2.2") } diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml index f1fc006..cf4d137 100644 --- a/app/src/main/AndroidManifest.xml +++ b/app/src/main/AndroidManifest.xml @@ -2,6 +2,7 @@ + { + val jsonObject = JSONObject(jsonString) + val map = mutableMapOf() + + jsonObject.keys().forEach { key -> + val value = jsonObject.get(key) + map[key] = value + } + + return map +} + +suspend fun join(peer: Peer, realm: String, serializer: Serializer): PeerBaseSession { + val joiner = Joiner(realm, serializer) + val hello = joiner.sendHello() + + peer.send(hello as ByteArray) + + while (true) { + val msg = peer.receive() + val toSend = joiner.receive(msg) + + if (toSend == null) { + val sessionDetails = joiner.getSessionDetails() + val base = PeerBaseSession(peer, sessionDetails, serializer) + return base + } + + peer.send(toSend as ByteArray) + } +} diff --git a/app/src/main/java/io/xconn/wampwebrtc/MessageAssembler.kt b/app/src/main/java/io/xconn/wampwebrtc/MessageAssembler.kt new file mode 100644 index 0000000..4472c93 --- /dev/null +++ b/app/src/main/java/io/xconn/wampwebrtc/MessageAssembler.kt @@ -0,0 +1,35 @@ +package io.xconn.wampwebrtc + +import java.nio.ByteBuffer + +class MessageAssembler { + private val builder = ByteBuffer.allocate(1024 * 1024) // Allocate 1MB buffer, adjust as needed + + fun feed(data: ByteArray): ByteArray? { + builder.put(data, 1, data.size - 1) + val isFinal = data[0] + return if (isFinal == 1.toByte()) { + val result = ByteArray(builder.position()) + builder.flip() + builder.get(result) + builder.clear() + result + } else { + null + } + } + + fun chunkMessage(message: ByteArray): Sequence = sequence { + val chunkSize = 16 * 1024 - 1 // 16KB - 1 byte for metadata + val totalChunks = (message.size + chunkSize - 1) / chunkSize + + for (i in 0 until totalChunks) { + val start = i * chunkSize + val end = if (i == totalChunks - 1) message.size else start + chunkSize + val chunk = message.copyOfRange(start, end) + + val isFinal = if (i == totalChunks - 1) 1.toByte() else 0.toByte() + yield(byteArrayOf(isFinal) + chunk) + } + } +} diff --git a/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt b/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt new file mode 100644 index 0000000..7040373 --- /dev/null +++ b/app/src/main/java/io/xconn/wampwebrtc/Offerer.kt @@ -0,0 +1,121 @@ +package io.xconn.wampwebrtc + +import android.content.Context +import org.webrtc.* +import java.util.concurrent.LinkedBlockingDeque +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +class Offerer( + context: Context, + private val queue: LinkedBlockingDeque, + private val signalIceCandidate: (IceCandidate) -> Unit, +) { + init { + val options = PeerConnectionFactory.InitializationOptions.builder(context) + .createInitializationOptions() + PeerConnectionFactory.initialize(options) + } + + var peerConnection: PeerConnection? = null + var dataChannel: DataChannel? = null + val peerConnectionFactory = PeerConnectionFactory.builder().createPeerConnectionFactory() + private var assembler = MessageAssembler() + + suspend fun createOffer(offerConfig: OfferConfig): SessionDescription? { + val configuration = PeerConnection.RTCConfiguration(offerConfig.iceServers) + + peerConnection = peerConnectionFactory.createPeerConnection( + configuration, + object : PeerConnection.Observer { + override fun onIceCandidate(candidate: IceCandidate?) { + candidate?.let { + signalIceCandidate(it) + peerConnection?.addIceCandidate(it) + } + } + + override fun onDataChannel(channel: DataChannel?) { + channel?.registerObserver(object : DataChannel.Observer { + override fun onMessage(buffer: DataChannel.Buffer?) {} + + override fun onBufferedAmountChange(p0: Long) {} + override fun onStateChange() {} + }) + } + + override fun onSignalingChange(p0: PeerConnection.SignalingState?) {} + override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) {} + override fun onIceConnectionReceivingChange(p0: Boolean) {} + override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {} + override fun onAddStream(p0: MediaStream?) {} + override fun onRemoveStream(p0: MediaStream?) {} + override fun onRenegotiationNeeded() {} + override fun onIceCandidatesRemoved(p0: Array?) {} + }) + + // Create and set up the data channel + val conf = DataChannel.Init().apply { + id = offerConfig.id + ordered = offerConfig.ordered + protocol = offerConfig.protocol + } + dataChannel = peerConnection?.createDataChannel("wamp", conf) + + return suspendCoroutine { continuation -> + peerConnection?.createOffer(object : SdpObserver { + override fun onCreateSuccess(description: SessionDescription?) { + peerConnection?.setLocalDescription(object : SdpObserver { + override fun onCreateSuccess(description: SessionDescription?) {} + override fun onSetSuccess() { + continuation.resume(description) + } + override fun onCreateFailure(p0: String?) {} + override fun onSetFailure(p0: String?) {} + + }, description) + } + + override fun onSetSuccess() {} + override fun onCreateFailure(p0: String?) {} + override fun onSetFailure(p0: String?) {} + }, MediaConstraints()) + } + } + + suspend fun waitForDataChannelOpen(): Unit = suspendCoroutine { continuation -> + dataChannel?.registerObserver(object : DataChannel.Observer { + override fun onStateChange() { + if (dataChannel?.state() == DataChannel.State.OPEN) { + continuation.resume(Unit) + } + } + + override fun onBufferedAmountChange(p0: Long) {} + override fun onMessage(buffer: DataChannel.Buffer?) { + buffer?.data?.let { + val data = ByteArray(it.remaining()) + it.get(data) + + val message = assembler.feed(data) + if (message != null) { + queue.put(message) + } + } + } + }) + } + + fun setRemoteDescription(sessionDescription: SessionDescription) { + peerConnection?.setRemoteDescription(object : SdpObserver { + override fun onCreateSuccess(p0: SessionDescription?) {} + override fun onSetSuccess() {} + override fun onCreateFailure(p0: String?) {} + override fun onSetFailure(p0: String?) {} + }, sessionDescription) + } + + fun addIceCandidate(candidate: IceCandidate) { + peerConnection?.addIceCandidate(candidate) + } +} diff --git a/app/src/main/java/io/xconn/wampwebrtc/Types.kt b/app/src/main/java/io/xconn/wampwebrtc/Types.kt new file mode 100644 index 0000000..454ed0e --- /dev/null +++ b/app/src/main/java/io/xconn/wampwebrtc/Types.kt @@ -0,0 +1,71 @@ +package io.xconn.wampwebrtc + +import io.xconn.wampproto.SessionDetails +import io.xconn.wampproto.messages.Message +import io.xconn.wampproto.serializers.Serializer +import io.xconn.xconn.IBaseSession +import org.webrtc.DataChannel +import org.webrtc.PeerConnection +import org.webrtc.PeerConnection.IceServer + +interface Peer { + suspend fun send(data: ByteArray) + + suspend fun sendMessage(message: Message) + + suspend fun receive(): Any + + suspend fun receiveMessage(): Message + + fun close() +} + +class PeerBaseSession( + private val peer: Peer, + private val sessionDetails: SessionDetails, + private val serializer: Serializer, +) : IBaseSession { + override fun id(): Long = sessionDetails.sessionID + + override fun realm(): String = sessionDetails.realm + + override fun authid(): String = sessionDetails.authid + + override fun authrole(): String = sessionDetails.authrole + + override fun serializer(): Serializer = serializer + + override suspend fun send(data: Any) { + peer.send(data as ByteArray) + } + + override suspend fun receive(): Any = peer.receive() + + override suspend fun sendMessage(msg: Message) { + peer.sendMessage(msg) + } + + override suspend fun receiveMessage(): Message = peer.receiveMessage() + + override suspend fun close() = peer.close() +} + +data class ClientConfig( + val url: String, + val realm: String, + val procedureWebRTCOffer: String, + val topicAnswererOnCandidate: String, + val serializer: Serializer, + val subProtocol: String, + val iceServers: List, +) + +data class OfferConfig( + val protocol: String, + val iceServers: List, + val ordered: Boolean, + val id: Int, + val topicAnswererOnCandidate: String, +) + +data class WebRTCSession(val connection: PeerConnection, val channel: DataChannel) diff --git a/app/src/main/java/io/xconn/wampwebrtc/WAMPSession.kt b/app/src/main/java/io/xconn/wampwebrtc/WAMPSession.kt new file mode 100644 index 0000000..dd864d8 --- /dev/null +++ b/app/src/main/java/io/xconn/wampwebrtc/WAMPSession.kt @@ -0,0 +1,19 @@ +package io.xconn.wampwebrtc + +import android.content.Context +import io.xconn.xconn.Session +import java.util.concurrent.LinkedBlockingDeque + +class WAMPSession(private val context: Context) { + + suspend fun connect(config: ClientConfig): Session { + val queue = LinkedBlockingDeque() + val webRTCConnection = WebRTC(context, queue) + val webRTCSession = webRTCConnection.connect(config) + + val peer = WebRTCPeer(webRTCSession.channel, config.serializer, queue) + val baseSession = join(peer, config.realm, config.serializer) + + return Session(baseSession) + } +} diff --git a/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt b/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt new file mode 100644 index 0000000..6c2551b --- /dev/null +++ b/app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt @@ -0,0 +1,100 @@ +package io.xconn.wampwebrtc + +import android.content.Context +import io.xconn.wampproto.serializers.CBORSerializer +import io.xconn.xconn.Client +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.json.JSONArray +import org.json.JSONObject +import org.webrtc.DataChannel +import org.webrtc.IceCandidate +import org.webrtc.SessionDescription +import java.util.UUID +import java.util.concurrent.LinkedBlockingDeque + +class WebRTC(private val context: Context, private val queue: LinkedBlockingDeque) { + suspend fun connect(config: ClientConfig): WebRTCSession { + val client = Client(serializer = CBORSerializer()) + val session = client.connect(config.url, config.realm) + + val requestID = UUID.randomUUID().toString() + val offerConfig = OfferConfig( + config.subProtocol, config.iceServers, true, 1, config.topicAnswererOnCandidate + ) + + val candidates: MutableList = mutableListOf() + val offerer = + Offerer( + context, + queue, + signalIceCandidate = { candidate -> + candidates.add(candidate) + GlobalScope.launch { + session.publish( + offerConfig.topicAnswererOnCandidate, + listOf( + requestID, + JSONObject( + mapOf( + "sdpMid" to candidate.sdpMid, + "sdpMLineIndex" to candidate.sdpMLineIndex, + "candidate" to candidate.sdp, + ), + ).toString(), + ), + ) + } + }, + ) + + val offer = offerer.createOffer(offerConfig) + Thread.sleep(200) + + val candidatesList = + candidates.map { candidate -> + mapOf( + "sdpMid" to candidate.sdpMid, + "sdpMLineIndex" to candidate.sdpMLineIndex, + "candidate" to candidate.sdp, + ) + } + + val sdpData = + mapOf( + "description" to mapOf("type" to "offer", "sdp" to offer?.description), + "candidates" to candidatesList, + ) + val json = JSONObject(sdpData).toString() + + val res = session.call(config.procedureWebRTCOffer, listOf(requestID, json)).await() + val jsonString = res.args?.get(0) as String + + val result = convertJsonToMap(jsonString) + + val remoteCandidates = result["candidates"] as JSONArray + for (i in 0 until remoteCandidates.length()) { + val candidateObject = remoteCandidates.getJSONObject(i) + val candidate = candidateObject.getString("candidate") + val sdpMid = candidateObject.optString("sdpMid") + val sdpMLineIndex = candidateObject.optInt("sdpMLineIndex") + + val iceCandidate = IceCandidate(sdpMid, sdpMLineIndex, candidate) + offerer.addIceCandidate(iceCandidate) + } + + val descriptionMap = result["description"] as JSONObject + + val sdpString = descriptionMap.getString("sdp") + val sdpType = descriptionMap.getString("type") + + val remoteDescription = + SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString) + + offerer.setRemoteDescription(remoteDescription) + + offerer.waitForDataChannelOpen() + + return WebRTCSession(offerer.peerConnection!!, offerer.dataChannel!!) + } +} diff --git a/app/src/main/java/io/xconn/wampwebrtc/WebRTCPeer.kt b/app/src/main/java/io/xconn/wampwebrtc/WebRTCPeer.kt new file mode 100644 index 0000000..5f20774 --- /dev/null +++ b/app/src/main/java/io/xconn/wampwebrtc/WebRTCPeer.kt @@ -0,0 +1,39 @@ +package io.xconn.wampwebrtc + +import io.xconn.wampproto.messages.Message +import io.xconn.wampproto.serializers.Serializer +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import org.webrtc.DataChannel +import java.nio.ByteBuffer +import java.util.concurrent.LinkedBlockingDeque + +class WebRTCPeer( + var channel: DataChannel?, + private var serializer: Serializer, + private var queue: LinkedBlockingDeque, +) : Peer { + private var assembler = MessageAssembler() + + override suspend fun send(data: ByteArray) { + for (chunk in assembler.chunkMessage(data)) { + channel?.send(DataChannel.Buffer(ByteBuffer.wrap(chunk), false)) + } + } + + override suspend fun sendMessage(message: Message) { + val byteMessage = serializer.serialize(message) + send(byteMessage as ByteArray) + } + + override suspend fun receive(): Any = + withContext(Dispatchers.IO) { + queue.take() + } + + override suspend fun receiveMessage(): Message = serializer.deserialize(receive()) + + override fun close() { + channel?.close() + } +}