diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt index 774d6bf6..20f66571 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt @@ -68,9 +68,6 @@ sealed interface Realtime : MainPlugin, CustomSerializationPlug */ fun disconnect() - @SupabaseInternal - fun Realtime.addChannel(channel: RealtimeChannel) - /** * Unsubscribes and removes a channel from the [subscriptions] * @param channel The channel to remove @@ -103,6 +100,17 @@ sealed interface Realtime : MainPlugin, CustomSerializationPlug */ suspend fun setAuth(token: String? = null) + /** + * Creates a new [RealtimeChannel] and adds it to the [subscriptions] + * + * - This method does not subscribe to the channel. You have to call [RealtimeChannel.subscribe] to do so. + * - If a channel with the same [channelId] already exists, it will be returned + * + * @param channelId The id of the channel + * @param builder The builder for the channel + */ + fun channel(channelId: String, builder: RealtimeChannelBuilder): RealtimeChannel + /** * @property websocketConfig Custom configuration for the Ktor Websocket Client. This only applies if [Realtime.Config.websocketFactory] is null. * @property secure Whether to use wss or ws. Defaults to [SupabaseClient.useHTTPS] when null @@ -187,11 +195,15 @@ sealed interface Realtime : MainPlugin, CustomSerializationPlug } /** - * Creates a new [RealtimeChannel] + * Creates a new [RealtimeChannel] and adds it to the [Realtime.subscriptions] + * + * - This method does not subscribe to the channel. You have to call [RealtimeChannel.subscribe] to do so. + * - If a channel with the same [channelId] already exists, it will be returned + * + * @param channelId The id of the channel + * @param builder The builder for the channel */ -inline fun Realtime.channel(channelId: String, builder: RealtimeChannelBuilder.() -> Unit = {}): RealtimeChannel { - return RealtimeChannelBuilder("realtime:$channelId", this as RealtimeImpl).apply(builder).build() -} +inline fun Realtime.channel(channelId: String, builder: RealtimeChannelBuilder.() -> Unit = {}): RealtimeChannel = channel(channelId, RealtimeChannelBuilder(RealtimeTopic.withChannelId(channelId)).apply(builder)) /** * Supabase Realtime is a way to listen to changes in the PostgreSQL database via websockets diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelBuilder.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelBuilder.kt index 14e6d954..1edd17c3 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelBuilder.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelBuilder.kt @@ -7,7 +7,7 @@ import io.github.jan.supabase.realtime.annotations.ChannelDsl * Used to build a realtime channel */ @ChannelDsl -class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String, private val realtimeImpl: RealtimeImpl) { +class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String) { private var broadcastJoinConfig = BroadcastJoinConfig(acknowledgeBroadcasts = false, receiveOwnBroadcasts = false) private var presenceJoinConfig = PresenceJoinConfig("") @@ -33,9 +33,9 @@ class RealtimeChannelBuilder @PublishedApi internal constructor(private val topi } @SupabaseInternal - fun build(): RealtimeChannel { + fun build(realtime: Realtime): RealtimeChannel { return RealtimeChannelImpl( - realtimeImpl, + realtime, topic, broadcastJoinConfig, presenceJoinConfig, diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt index 71197593..3d11ef80 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt @@ -27,23 +27,23 @@ import kotlin.reflect.KClass import kotlin.reflect.KType internal class RealtimeChannelImpl( - private val realtimeImpl: RealtimeImpl, + override val realtime: Realtime, override val topic: String, private val broadcastJoinConfig: BroadcastJoinConfig, private val presenceJoinConfig: PresenceJoinConfig, private val isPrivate: Boolean, ) : RealtimeChannel { + private val realtimeImpl: RealtimeImpl = realtime as RealtimeImpl private val clientChanges = AtomicMutableList() @SupabaseInternal override val callbackManager = CallbackManagerImpl(realtimeImpl.serializer) private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED) override val status = _status.asStateFlow() - override val realtime: Realtime = realtimeImpl override val supabaseClient = realtimeImpl.supabaseClient private val broadcastUrl = realtimeImpl.broadcastUrl() - private val subTopic = topic.replaceFirst(Regex("^realtime:", RegexOption.IGNORE_CASE), "") + private val subTopic = topic.replaceFirst(Regex("^${RealtimeTopic.PREFIX}:", RegexOption.IGNORE_CASE), "") private val httpClient = realtimeImpl.supabaseClient.httpClient private suspend fun accessToken() = realtimeImpl.config.accessToken(supabaseClient) ?: realtimeImpl.accessToken @@ -54,9 +54,6 @@ internal class RealtimeChannelImpl( if(!realtimeImpl.config.connectOnSubscribe) error("You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?") realtimeImpl.connect() } - realtimeImpl.run { - addChannel(this@RealtimeChannelImpl) - } _status.value = RealtimeChannel.Status.SUBSCRIBING Realtime.logger.d { "Subscribing to channel $topic" } val currentJwt = accessToken() diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt index 28994f4b..b3670957 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt @@ -1,7 +1,6 @@ package io.github.jan.supabase.realtime import io.github.jan.supabase.SupabaseClient -import io.github.jan.supabase.annotations.SupabaseInternal import io.github.jan.supabase.auth.Auth import io.github.jan.supabase.auth.status.SessionStatus import io.github.jan.supabase.buildUrl @@ -162,6 +161,14 @@ import kotlin.io.encoding.ExperimentalEncodingApi _status.value = Realtime.Status.DISCONNECTED } + override fun channel(channelId: String, builder: RealtimeChannelBuilder): RealtimeChannel { + val topic = RealtimeTopic.withChannelId(channelId) + if(subscriptions.containsKey(topic)) return subscriptions[topic]!! + val channel = builder.build(this) + _subscriptions[topic] = channel + return channel + } + private suspend fun onMessage(message: RealtimeMessage) { Realtime.logger.d { "Received message $message" } val channel = subscriptions[message.topic] as? RealtimeChannelImpl @@ -234,11 +241,6 @@ import kotlin.io.encoding.ExperimentalEncodingApi } } - @SupabaseInternal - override fun Realtime.addChannel(channel: RealtimeChannel) { - _subscriptions[channel.topic] = channel - } - override suspend fun close() { disconnect() } diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeTopic.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeTopic.kt new file mode 100644 index 00000000..97bc9847 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeTopic.kt @@ -0,0 +1,12 @@ +package io.github.jan.supabase.realtime + +@PublishedApi +internal object RealtimeTopic { + + const val PREFIX = "realtime" + + fun withChannelId(channelId: String): String { + return "$PREFIX:$channelId" + } + +} \ No newline at end of file diff --git a/Realtime/src/commonTest/kotlin/RealtimeTest.kt b/Realtime/src/commonTest/kotlin/RealtimeTest.kt index 5ff705e6..d6689f25 100644 --- a/Realtime/src/commonTest/kotlin/RealtimeTest.kt +++ b/Realtime/src/commonTest/kotlin/RealtimeTest.kt @@ -1,6 +1,7 @@ import io.github.jan.supabase.realtime.Realtime import io.github.jan.supabase.realtime.RealtimeImpl import io.github.jan.supabase.realtime.RealtimeMessage +import io.github.jan.supabase.realtime.channel import io.github.jan.supabase.realtime.realtime import io.ktor.util.encodeBase64 import kotlinx.coroutines.test.runTest @@ -31,6 +32,22 @@ class RealtimeTest { } } + @Test + fun testExistingChannelShouldBeReturned() { + runTest { + createTestClient( + wsHandler = { _, _ -> + //Does not matter for this test + }, + supabaseHandler = { + val channel = it.realtime.channel("channelId") + val channel2 = it.realtime.channel("channelId") + assertEquals(channel, channel2) + } + ) + } + } + @Test fun testSendingRealtimeMessages() { val expectedMessage = RealtimeMessage( diff --git a/Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt b/Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt index 34edb30f..2250877f 100644 --- a/Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt +++ b/Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt @@ -5,6 +5,7 @@ import io.github.jan.supabase.realtime.Presence import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_PRESENCE_DIFF import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_SYSTEM import io.github.jan.supabase.realtime.RealtimeMessage +import io.github.jan.supabase.realtime.RealtimeTopic import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow @@ -29,18 +30,18 @@ suspend fun Auth.importAuthTokenValid(token: String) { suspend fun handleSubscribe(incoming: ReceiveChannel, outgoing: SendChannel, channelId: String) { incoming.receive() - outgoing.send(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, "")) + outgoing.send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, "")) } suspend fun SendChannel.sendBroadcast(channelId: String, event: String, message: JsonObject) { - send(RealtimeMessage("realtime:$channelId", "broadcast", buildJsonObject { + send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), "broadcast", buildJsonObject { put("event", event) put("payload", message) }, "")) } suspend fun SendChannel.sendPresence(channelId: String, joins: Map, leaves: Map) { - send(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_PRESENCE_DIFF, buildJsonObject { + send(RealtimeMessage(RealtimeTopic.withChannelId(channelId), CHANNEL_EVENT_PRESENCE_DIFF, buildJsonObject { put("joins", transformPresenceMap(joins)) put("leaves", transformPresenceMap(leaves)) }, "")) diff --git a/Realtime/src/commonTest/kotlin/RealtimeTopicTest.kt b/Realtime/src/commonTest/kotlin/RealtimeTopicTest.kt new file mode 100644 index 00000000..a66a0a24 --- /dev/null +++ b/Realtime/src/commonTest/kotlin/RealtimeTopicTest.kt @@ -0,0 +1,19 @@ +import io.github.jan.supabase.realtime.RealtimeTopic +import kotlin.test.Test +import kotlin.test.assertEquals + +class RealtimeTopicTest { + + @Test + fun testRealtimeTopic() { + val channelId = "channelId" + val topic = RealtimeTopic.withChannelId(channelId) + assertEquals("realtime:channelId", topic) + } + + @Test + fun testRealtimePrefix() { + assertEquals("realtime", RealtimeTopic.PREFIX) + } + +} \ No newline at end of file