Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve behavior for realtime channel creation and improve docs #831

Merged
merged 6 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
*/
fun disconnect()

@SupabaseInternal
fun Realtime.addChannel(channel: RealtimeChannel)

/**
* Unsubscribes and removes a channel from the [subscriptions]
* @param channel The channel to remove
Expand Down Expand Up @@ -103,6 +100,17 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
*/
suspend fun setAuth(token: String? = null)

/**
* Creates a new [RealtimeChannel] and adds it to the [subscriptions]
*
* - This method does not subscribe to the channel. You have to call [RealtimeChannel.subscribe] to do so.
* - If a channel with the same [channelId] already exists, it will be returned
*
* @param channelId The id of the channel
* @param builder The builder for the channel
*/
fun channel(channelId: String, builder: RealtimeChannelBuilder): RealtimeChannel

/**
* @property websocketConfig Custom configuration for the Ktor Websocket Client. This only applies if [Realtime.Config.websocketFactory] is null.
* @property secure Whether to use wss or ws. Defaults to [SupabaseClient.useHTTPS] when null
Expand Down Expand Up @@ -187,11 +195,15 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
}

/**
* Creates a new [RealtimeChannel]
* Creates a new [RealtimeChannel] and adds it to the [Realtime.subscriptions]
*
* - This method does not subscribe to the channel. You have to call [RealtimeChannel.subscribe] to do so.
* - If a channel with the same [channelId] already exists, it will be returned
*
* @param channelId The id of the channel
* @param builder The builder for the channel
*/
inline fun Realtime.channel(channelId: String, builder: RealtimeChannelBuilder.() -> Unit = {}): RealtimeChannel {
return RealtimeChannelBuilder("realtime:$channelId", this as RealtimeImpl).apply(builder).build()
}
inline fun Realtime.channel(channelId: String, builder: RealtimeChannelBuilder.() -> Unit = {}): RealtimeChannel = channel(channelId, RealtimeChannelBuilder(RealtimeTopic.withChannelId(channelId)).apply(builder))

/**
* Supabase Realtime is a way to listen to changes in the PostgreSQL database via websockets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.github.jan.supabase.realtime.annotations.ChannelDsl
* Used to build a realtime channel
*/
@ChannelDsl
class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String, private val realtimeImpl: RealtimeImpl) {
class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String) {

private var broadcastJoinConfig = BroadcastJoinConfig(acknowledgeBroadcasts = false, receiveOwnBroadcasts = false)
private var presenceJoinConfig = PresenceJoinConfig("")
Expand All @@ -33,9 +33,9 @@ class RealtimeChannelBuilder @PublishedApi internal constructor(private val topi
}

@SupabaseInternal
fun build(): RealtimeChannel {
fun build(realtime: Realtime): RealtimeChannel {
return RealtimeChannelImpl(
realtimeImpl,
realtime,
topic,
broadcastJoinConfig,
presenceJoinConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import kotlin.reflect.KClass
import kotlin.reflect.KType

internal class RealtimeChannelImpl(
private val realtimeImpl: RealtimeImpl,
override val realtime: Realtime,
override val topic: String,
private val broadcastJoinConfig: BroadcastJoinConfig,
private val presenceJoinConfig: PresenceJoinConfig,
private val isPrivate: Boolean,
) : RealtimeChannel {

private val realtimeImpl: RealtimeImpl = realtime as RealtimeImpl
private val clientChanges = AtomicMutableList<PostgresJoinConfig>()
@SupabaseInternal
override val callbackManager = CallbackManagerImpl(realtimeImpl.serializer)
private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED)
override val status = _status.asStateFlow()
override val realtime: Realtime = realtimeImpl
override val supabaseClient = realtimeImpl.supabaseClient

private val broadcastUrl = realtimeImpl.broadcastUrl()
private val subTopic = topic.replaceFirst(Regex("^realtime:", RegexOption.IGNORE_CASE), "")
private val subTopic = topic.replaceFirst(Regex("^${RealtimeTopic.PREFIX}:", RegexOption.IGNORE_CASE), "")
private val httpClient = realtimeImpl.supabaseClient.httpClient

private suspend fun accessToken() = realtimeImpl.config.accessToken(supabaseClient) ?: realtimeImpl.accessToken
Expand All @@ -54,9 +54,6 @@ internal class RealtimeChannelImpl(
if(!realtimeImpl.config.connectOnSubscribe) error("You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?")
realtimeImpl.connect()
}
realtimeImpl.run {
addChannel(this@RealtimeChannelImpl)
}
_status.value = RealtimeChannel.Status.SUBSCRIBING
Realtime.logger.d { "Subscribing to channel $topic" }
val currentJwt = accessToken()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.jan.supabase.realtime

import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.Auth
import io.github.jan.supabase.auth.status.SessionStatus
import io.github.jan.supabase.buildUrl
Expand Down Expand Up @@ -162,6 +161,14 @@ import kotlin.io.encoding.ExperimentalEncodingApi
_status.value = Realtime.Status.DISCONNECTED
}

override fun channel(channelId: String, builder: RealtimeChannelBuilder): RealtimeChannel {
val topic = RealtimeTopic.withChannelId(channelId)
if(subscriptions.containsKey(topic)) return subscriptions[topic]!!
val channel = builder.build(this)
_subscriptions[topic] = channel
return channel
}

private suspend fun onMessage(message: RealtimeMessage) {
Realtime.logger.d { "Received message $message" }
val channel = subscriptions[message.topic] as? RealtimeChannelImpl
Expand Down Expand Up @@ -234,11 +241,6 @@ import kotlin.io.encoding.ExperimentalEncodingApi
}
}

@SupabaseInternal
override fun Realtime.addChannel(channel: RealtimeChannel) {
_subscriptions[channel.topic] = channel
}

override suspend fun close() {
disconnect()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.github.jan.supabase.realtime

@PublishedApi
internal object RealtimeTopic {

const val PREFIX = "realtime"

fun withChannelId(channelId: String): String {
return "$PREFIX:$channelId"
}

}
17 changes: 17 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.RealtimeImpl
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.channel
import io.github.jan.supabase.realtime.realtime
import io.ktor.util.encodeBase64
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -31,6 +32,22 @@ class RealtimeTest {
}
}

@Test
fun testExistingChannelShouldBeReturned() {
runTest {
createTestClient(
wsHandler = { _, _ ->
//Does not matter for this test
},
supabaseHandler = {
val channel = it.realtime.channel("channelId")
val channel2 = it.realtime.channel("channelId")
assertEquals(channel, channel2)
}
)
}
}

@Test
fun testSendingRealtimeMessages() {
val expectedMessage = RealtimeMessage(
Expand Down
7 changes: 4 additions & 3 deletions Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.github.jan.supabase.realtime.Presence
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_PRESENCE_DIFF
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_SYSTEM
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.RealtimeTopic
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
Expand All @@ -29,18 +30,18 @@ suspend fun Auth.importAuthTokenValid(token: String) {

suspend fun handleSubscribe(incoming: ReceiveChannel<RealtimeMessage>, outgoing: SendChannel<RealtimeMessage>, channelId: String) {
incoming.receive()
outgoing.send(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, ""))
outgoing.send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, ""))
}

suspend fun SendChannel<RealtimeMessage>.sendBroadcast(channelId: String, event: String, message: JsonObject) {
send(RealtimeMessage("realtime:$channelId", "broadcast", buildJsonObject {
send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), "broadcast", buildJsonObject {
put("event", event)
put("payload", message)
}, ""))
}

suspend fun SendChannel<RealtimeMessage>.sendPresence(channelId: String, joins: Map<String, Presence>, leaves: Map<String, Presence>) {
send(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_PRESENCE_DIFF, buildJsonObject {
send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), CHANNEL_EVENT_PRESENCE_DIFF, buildJsonObject {
put("joins", transformPresenceMap(joins))
put("leaves", transformPresenceMap(leaves))
}, ""))
Expand Down
19 changes: 19 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeTopicTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import io.github.jan.supabase.realtime.RealtimeTopic
import kotlin.test.Test
import kotlin.test.assertEquals

class RealtimeTopicTest {

@Test
fun testRealtimeTopic() {
val channelId = "channelId"
val topic = RealtimeTopic.withChannelId(channelId)
assertEquals("realtime:channelId", topic)
}

@Test
fun testRealtimePrefix() {
assertEquals("realtime", RealtimeTopic.PREFIX)
}

}
Loading