diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt index 37767e7fe..3d151ce1a 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt @@ -67,10 +67,7 @@ sealed interface Realtime : MainPlugin, CustomSerializationPlug fun disconnect() @SupabaseInternal - fun RealtimeChannel.addChannel(channel: RealtimeChannel) - - @SupabaseInternal - fun RealtimeChannel.deleteChannel(channel: RealtimeChannel) + fun Realtime.addChannel(channel: RealtimeChannel) /** * Unsubscribes and removes a channel from the [subscriptions] diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt index de7424192..6b4f8b4f3 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt @@ -31,6 +31,11 @@ sealed interface RealtimeChannel { */ val supabaseClient: SupabaseClient + /** + * The realtime instance + */ + val realtime: Realtime + @SupabaseInternal val callbackManager: CallbackManager @@ -102,6 +107,9 @@ sealed interface RealtimeChannel { @SupabaseInternal fun RealtimeChannel.removePostgresChange(data: PostgresJoinConfig) + @SupabaseInternal + fun updateStatus(status: Status) + /** * Represents the status of a channel */ diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt index c1fa9be46..7ddaa666b 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt @@ -2,16 +2,13 @@ package io.github.jan.supabase.realtime import io.github.jan.supabase.annotations.SupabaseInternal import io.github.jan.supabase.collections.AtomicMutableList -import io.github.jan.supabase.decodeIfNotEmptyOrDefault import io.github.jan.supabase.gotrue.resolveAccessToken import io.github.jan.supabase.logging.d import io.github.jan.supabase.logging.e -import io.github.jan.supabase.logging.w import io.github.jan.supabase.putJsonObject import io.github.jan.supabase.realtime.data.BroadcastApiBody import io.github.jan.supabase.realtime.data.BroadcastApiMessage -import io.github.jan.supabase.realtime.data.PostgresActionData -import io.github.jan.supabase.supabaseJson +import io.github.jan.supabase.realtime.event.RealtimeEvent import io.ktor.client.statement.bodyAsText import io.ktor.http.headers import kotlinx.coroutines.channels.awaitClose @@ -23,12 +20,8 @@ import kotlinx.coroutines.flow.first import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.buildJsonObject -import kotlinx.serialization.json.decodeFromJsonElement import kotlinx.serialization.json.encodeToJsonElement -import kotlinx.serialization.json.jsonArray import kotlinx.serialization.json.jsonObject -import kotlinx.serialization.json.jsonPrimitive -import kotlinx.serialization.json.longOrNull import kotlinx.serialization.json.put import kotlinx.serialization.json.putJsonObject import kotlin.reflect.KClass @@ -47,6 +40,7 @@ internal class RealtimeChannelImpl( override val callbackManager = CallbackManagerImpl(realtimeImpl.serializer) private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED) override val status = _status.asStateFlow() + override val realtime: Realtime = realtimeImpl override val supabaseClient = realtimeImpl.supabaseClient @@ -84,71 +78,13 @@ internal class RealtimeChannelImpl( } @OptIn(SupabaseInternal::class) - @Suppress("CyclomaticComplexMethod") //TODO: Refactor this method - fun onMessage(message: RealtimeMessage) { - if(message.eventType == null) { - Realtime.logger.e { "Received message without event type: $message" } + suspend fun onMessage(message: RealtimeMessage) { + val event = RealtimeEvent.resolveEvent(message) + if(event == null) { + Realtime.logger.e { "Received message without event: $message" } return } - when(message.eventType) { - RealtimeMessage.EventType.TOKEN_EXPIRED -> { - Realtime.logger.w { "Received token expired event. This should not happen, please report this warning." } - } - RealtimeMessage.EventType.SYSTEM -> { - Realtime.logger.d { "Subscribed to channel ${message.topic}" } - _status.value = RealtimeChannel.Status.SUBSCRIBED - } - RealtimeMessage.EventType.SYSTEM_REPLY -> { - Realtime.logger.d { "Received system reply: ${message.payload}." } - if(status.value == RealtimeChannel.Status.UNSUBSCRIBING) { - _status.value = RealtimeChannel.Status.UNSUBSCRIBED - Realtime.logger.d { "Unsubscribed from channel ${message.topic}" } - } - } - RealtimeMessage.EventType.POSTGRES_SERVER_CHANGES -> { //check if the server postgres_changes match with the client's and add the given id to the postgres change objects (to identify them later in the events) - val serverPostgresChanges = message.payload["response"]?.jsonObject?.get("postgres_changes")?.jsonArray?.let { Json.decodeFromJsonElement>(it) } ?: listOf() //server postgres changes - callbackManager.setServerChanges(serverPostgresChanges) - if(status.value != RealtimeChannel.Status.SUBSCRIBED) { - Realtime.logger.d { "Joined channel ${message.topic}" } - _status.value = RealtimeChannel.Status.SUBSCRIBED - } - } - RealtimeMessage.EventType.POSTGRES_CHANGES -> { - val data = message.payload["data"]?.jsonObject ?: return - val ids = message.payload["ids"]?.jsonArray?.mapNotNull { it.jsonPrimitive.longOrNull } ?: emptyList() //the ids of the matching postgres changes - val postgresAction = supabaseJson.decodeFromJsonElement(data) - val action = when(data["type"]?.jsonPrimitive?.content ?: "") { - "UPDATE" -> PostgresAction.Update(postgresAction.record ?: error("Received no record on update event"), postgresAction.oldRecord ?: error("Received no old record on update event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer) - "DELETE" -> PostgresAction.Delete(postgresAction.oldRecord ?: error("Received no old record on delete event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer) - "INSERT" -> PostgresAction.Insert(postgresAction.record ?: error("Received no record on update event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer) - "SELECT" -> PostgresAction.Select(postgresAction.record ?: error("Received no record on update event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer) - else -> error("Unknown event type ${message.event}") - } - callbackManager.triggerPostgresChange(ids, action) - } - RealtimeMessage.EventType.BROADCAST -> { - val event = message.payload["event"]?.jsonPrimitive?.content ?: "" - callbackManager.triggerBroadcast(event, message.payload["payload"]?.jsonObject ?: JsonObject(mutableMapOf())) - } - RealtimeMessage.EventType.CLOSE -> { - realtimeImpl.run { - deleteChannel(this@RealtimeChannelImpl) - } - Realtime.logger.d { "Unsubscribed from channel ${message.topic}" } - } - RealtimeMessage.EventType.ERROR -> { - Realtime.logger.e { "Received an error in channel ${message.topic}. That could be as a result of an invalid access token" } - } - RealtimeMessage.EventType.PRESENCE_DIFF -> { - val joins = message.payload["joins"]?.jsonObject?.decodeIfNotEmptyOrDefault(mapOf()) ?: emptyMap() - val leaves = message.payload["leaves"]?.jsonObject?.decodeIfNotEmptyOrDefault(mapOf()) ?: emptyMap() - callbackManager.triggerPresenceDiff(joins, leaves) - } - RealtimeMessage.EventType.PRESENCE_STATE -> { - val joins = message.payload.decodeIfNotEmptyOrDefault(mapOf()) - callbackManager.triggerPresenceDiff(joins, mapOf()) - } - } + event.handle(this, message) } override suspend fun unsubscribe() { @@ -277,5 +213,9 @@ internal class RealtimeChannelImpl( awaitClose { callbackManager.removeCallbackById(id) } } + override fun updateStatus(status: RealtimeChannel.Status) { + _status.value = status + } + } diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt index 66b4d560e..3acb90111 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt @@ -14,31 +14,23 @@ import io.github.jan.supabase.logging.i import io.github.jan.supabase.logging.w import io.github.jan.supabase.realtime.websocket.KtorRealtimeWebsocketFactory import io.github.jan.supabase.realtime.websocket.RealtimeWebsocket -import io.github.jan.supabase.supabaseJson -import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -import io.ktor.client.plugins.websocket.sendSerialized import io.ktor.client.statement.HttpResponse import io.ktor.http.URLProtocol import io.ktor.http.path -import io.ktor.websocket.Frame -import io.ktor.websocket.readText import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.isActive -import kotlinx.coroutines.job import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.serialization.json.buildJsonObject -import kotlin.time.Duration.Companion.milliseconds @PublishedApi internal class RealtimeImpl(override val supabaseClient: SupabaseClient, override val config: Realtime.Config) : Realtime { @@ -163,7 +155,7 @@ import kotlin.time.Duration.Companion.milliseconds _status.value = Realtime.Status.DISCONNECTED } - private fun onMessage(message: RealtimeMessage) { + private suspend fun onMessage(message: RealtimeMessage) { Realtime.logger.d { "Received message $message" } val channel = subscriptions[message.topic] as? RealtimeChannelImpl if(message.ref?.toIntOrNull() == heartbeatRef) { @@ -208,11 +200,6 @@ import kotlin.time.Duration.Companion.milliseconds } } - @SupabaseInternal - override fun RealtimeChannel.deleteChannel(channel: RealtimeChannel) { - _subscriptions.remove(channel.topic) - } - override suspend fun removeAllChannels() { _subscriptions.forEach { (_, it) -> if(it.status.value == RealtimeChannel.Status.SUBSCRIBED) { @@ -227,7 +214,7 @@ import kotlin.time.Duration.Companion.milliseconds } @SupabaseInternal - override fun RealtimeChannel.addChannel(channel: RealtimeChannel) { + override fun Realtime.addChannel(channel: RealtimeChannel) { _subscriptions[channel.topic] = channel } diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeMessage.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeMessage.kt index 6832f15af..379017d35 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeMessage.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeMessage.kt @@ -2,36 +2,11 @@ package io.github.jan.supabase.realtime import io.github.jan.supabase.annotations.SupabaseInternal import kotlinx.serialization.Serializable -import kotlinx.serialization.Transient import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.jsonObject -import kotlinx.serialization.json.jsonPrimitive /** * Represents a message retrieved by the [RealtimeChannel] */ @Serializable @SupabaseInternal -data class RealtimeMessage(val topic: String, val event: String, val payload: JsonObject, val ref: String?) { - - @Transient - val eventType: EventType? = when { - event == RealtimeChannel.CHANNEL_EVENT_SYSTEM && payload["status"]?.jsonPrimitive?.content == "ok" -> EventType.SYSTEM - event == RealtimeChannel.CHANNEL_EVENT_REPLY && payload["response"]?.jsonObject?.containsKey(RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES) ?: false -> EventType.POSTGRES_SERVER_CHANGES - event == RealtimeChannel.CHANNEL_EVENT_REPLY && payload["status"]?.jsonPrimitive?.content == "ok" -> EventType.SYSTEM_REPLY - event == RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES -> EventType.POSTGRES_CHANGES - event == RealtimeChannel.CHANNEL_EVENT_BROADCAST -> EventType.BROADCAST - event == RealtimeChannel.CHANNEL_EVENT_CLOSE -> EventType.CLOSE - event == RealtimeChannel.CHANNEL_EVENT_ERROR -> EventType.ERROR - event == RealtimeChannel.CHANNEL_EVENT_PRESENCE_DIFF -> EventType.PRESENCE_DIFF - event == RealtimeChannel.CHANNEL_EVENT_PRESENCE_STATE -> EventType.PRESENCE_STATE - event == RealtimeChannel.CHANNEL_EVENT_SYSTEM && payload["message"]?.jsonPrimitive?.content?.contains("access token has expired") ?: false -> EventType.TOKEN_EXPIRED - else -> null - } - - enum class EventType { - SYSTEM, SYSTEM_REPLY, POSTGRES_SERVER_CHANGES, POSTGRES_CHANGES, BROADCAST, CLOSE, ERROR, PRESENCE_DIFF, PRESENCE_STATE, TOKEN_EXPIRED - } - -} - +data class RealtimeMessage(val topic: String, val event: String, val payload: JsonObject, val ref: String?) \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RBroadcastEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RBroadcastEvent.kt new file mode 100644 index 000000000..e163eda1b --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RBroadcastEvent.kt @@ -0,0 +1,23 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive + +/** + * Handles broadcast events + */ +data object RBroadcastEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + val event = message.payload["event"]?.jsonPrimitive?.content ?: "" + channel.callbackManager.triggerBroadcast(event, message.payload["payload"]?.jsonObject ?: JsonObject(mutableMapOf())) + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_BROADCAST + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RCloseEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RCloseEvent.kt new file mode 100644 index 000000000..1ef7d52c8 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RCloseEvent.kt @@ -0,0 +1,22 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.logging.d +import io.github.jan.supabase.realtime.Realtime +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage + +/** + * Event that handles the closing of a channel + */ +data object RCloseEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + channel.realtime.removeChannel(channel) + Realtime.logger.d { "Unsubscribed from channel ${message.topic}" } + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_CLOSE + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RErrorEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RErrorEvent.kt new file mode 100644 index 000000000..1a926475b --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RErrorEvent.kt @@ -0,0 +1,21 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.logging.e +import io.github.jan.supabase.realtime.Realtime +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage + +/** + * Event that handles an error event + */ +data object RErrorEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + Realtime.logger.e { "Received an error in channel ${message.topic}. That could be as a result of an invalid access token" } + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_ERROR + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPostgresChangesEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPostgresChangesEvent.kt new file mode 100644 index 000000000..ea318ba9b --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPostgresChangesEvent.kt @@ -0,0 +1,59 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.realtime.PostgresAction +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import io.github.jan.supabase.realtime.data.PostgresActionData +import io.github.jan.supabase.realtime.realtime +import io.github.jan.supabase.supabaseJson +import kotlinx.serialization.json.decodeFromJsonElement +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import kotlinx.serialization.json.longOrNull + +/** + * Handles postgres changes events + */ +data object RPostgresChangesEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + val data = message.payload["data"]?.jsonObject ?: return + val ids = message.payload["ids"]?.jsonArray?.mapNotNull { it.jsonPrimitive.longOrNull } ?: emptyList() //the ids of the matching postgres changes + val postgresAction = supabaseJson.decodeFromJsonElement(data) + val action = when(data["type"]?.jsonPrimitive?.content ?: "") { + "UPDATE" -> PostgresAction.Update( + postgresAction.record ?: error("Received no record on update event"), + postgresAction.oldRecord ?: error("Received no old record on update event"), + postgresAction.columns, + postgresAction.commitTimestamp, + channel.supabaseClient.realtime.serializer + ) + "DELETE" -> PostgresAction.Delete( + postgresAction.oldRecord ?: error("Received no old record on delete event"), + postgresAction.columns, + postgresAction.commitTimestamp, + channel.supabaseClient.realtime.serializer + ) + "INSERT" -> PostgresAction.Insert( + postgresAction.record ?: error("Received no record on update event"), + postgresAction.columns, + postgresAction.commitTimestamp, + channel.supabaseClient.realtime.serializer + ) + "SELECT" -> PostgresAction.Select( + postgresAction.record ?: error("Received no record on update event"), + postgresAction.columns, + postgresAction.commitTimestamp, + channel.supabaseClient.realtime.serializer + ) + else -> error("Unknown event type ${message.event}") + } + channel.callbackManager.triggerPostgresChange(ids, action) + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPostgresServerChangesEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPostgresServerChangesEvent.kt new file mode 100644 index 000000000..0f8834ef2 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPostgresServerChangesEvent.kt @@ -0,0 +1,33 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.logging.d +import io.github.jan.supabase.realtime.PostgresJoinConfig +import io.github.jan.supabase.realtime.Realtime +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.decodeFromJsonElement +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.jsonObject + +/** + * Event that handles the server changes + */ +data object RPostgresServerChangesEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + val serverPostgresChanges = message.payload["response"]?.jsonObject?.get("postgres_changes")?.jsonArray?.let { Json.decodeFromJsonElement>(it) } ?: listOf() //server postgres changes + channel.callbackManager.setServerChanges(serverPostgresChanges) + if(channel.status.value != RealtimeChannel.Status.SUBSCRIBED) { + Realtime.logger.d { "Joined channel ${message.topic}" } + channel.updateStatus(RealtimeChannel.Status.SUBSCRIBED) + } + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_REPLY && message.payload["response"]?.jsonObject?.containsKey( + RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES + ) ?: false + } + +} diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPresenceDiffEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPresenceDiffEvent.kt new file mode 100644 index 000000000..7d2be3910 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPresenceDiffEvent.kt @@ -0,0 +1,24 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.decodeIfNotEmptyOrDefault +import io.github.jan.supabase.realtime.Presence +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import kotlinx.serialization.json.jsonObject + +/** + * Event that handles the presence diff event + */ +data object RPresenceDiffEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + val joins = message.payload["joins"]?.jsonObject?.decodeIfNotEmptyOrDefault(mapOf()) ?: emptyMap() + val leaves = message.payload["leaves"]?.jsonObject?.decodeIfNotEmptyOrDefault(mapOf()) ?: emptyMap() + channel.callbackManager.triggerPresenceDiff(joins, leaves) + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_PRESENCE_DIFF + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPresenceStateEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPresenceStateEvent.kt new file mode 100644 index 000000000..53b65dbe1 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RPresenceStateEvent.kt @@ -0,0 +1,22 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.decodeIfNotEmptyOrDefault +import io.github.jan.supabase.realtime.Presence +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage + +/** + * Event that handles the presence state event + */ +data object RPresenceStateEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + val joins = message.payload.decodeIfNotEmptyOrDefault(mapOf()) + channel.callbackManager.triggerPresenceDiff(joins, mapOf()) + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_PRESENCE_STATE + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RSystemEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RSystemEvent.kt new file mode 100644 index 000000000..39b2828f0 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RSystemEvent.kt @@ -0,0 +1,23 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.logging.d +import io.github.jan.supabase.realtime.Realtime +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import kotlinx.serialization.json.jsonPrimitive + +/** + * Event that handles the system event + */ +data object RSystemEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + Realtime.logger.d { "Subscribed to channel ${message.topic}" } + channel.updateStatus(RealtimeChannel.Status.SUBSCRIBED) + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_SYSTEM && message.payload["status"]?.jsonPrimitive?.content == "ok" + } + +} diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RSystemReplyEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RSystemReplyEvent.kt new file mode 100644 index 000000000..715e85ae4 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RSystemReplyEvent.kt @@ -0,0 +1,26 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.logging.d +import io.github.jan.supabase.realtime.Realtime +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import kotlinx.serialization.json.jsonPrimitive + +/** + * Event that handles the system reply event + */ +data object RSystemReplyEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + Realtime.logger.d { "Received system reply: ${message.payload}." } + if(channel.status.value == RealtimeChannel.Status.UNSUBSCRIBING) { + channel.updateStatus(RealtimeChannel.Status.UNSUBSCRIBED) + Realtime.logger.d { "Unsubscribed from channel ${message.topic}" } + } + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_REPLY && message.payload["status"]?.jsonPrimitive?.content == "ok" + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RTokenExpiredEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RTokenExpiredEvent.kt new file mode 100644 index 000000000..fd3802c19 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RTokenExpiredEvent.kt @@ -0,0 +1,22 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.logging.w +import io.github.jan.supabase.realtime.Realtime +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage +import kotlinx.serialization.json.jsonPrimitive + +/** + * Event that handles the token expired event + */ +data object RTokenExpiredEvent : RealtimeEvent { + + override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) { + Realtime.logger.w { "Received token expired event. This should not happen, please report this warning." } + } + + override fun appliesTo(message: RealtimeMessage): Boolean { + return message.event == RealtimeChannel.CHANNEL_EVENT_SYSTEM && message.payload["message"]?.jsonPrimitive?.content?.contains("access token has expired") ?: false + } + +} \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RealtimeEvent.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RealtimeEvent.kt new file mode 100644 index 000000000..d74311006 --- /dev/null +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RealtimeEvent.kt @@ -0,0 +1,47 @@ +package io.github.jan.supabase.realtime.event + +import io.github.jan.supabase.realtime.RealtimeChannel +import io.github.jan.supabase.realtime.RealtimeMessage + +/** + * Interface for handling realtime events. + */ +internal sealed interface RealtimeEvent { + + /** + * Handles the event. + * @param channel The channel the event was received on. + * @param message The message that was received. + */ + suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) + + /** + * Checks if the event applies to the message. + */ + fun appliesTo(message: RealtimeMessage): Boolean + + companion object { + + private val EVENTS = setOf( // Kotlin doesn't provide a way to get all objects of a sealed interface outside the JVM, so we have to list them manually + RBroadcastEvent, + RCloseEvent, + RErrorEvent, + RPostgresChangesEvent, + RPostgresServerChangesEvent, + RPresenceStateEvent, + RPresenceDiffEvent, + RSystemEvent, + RTokenExpiredEvent, + RSystemReplyEvent + ) + + /** + * Resolves the event from a realtime message. + */ + fun resolveEvent(realtimeMessage: RealtimeMessage): RealtimeEvent? { + return EVENTS.firstOrNull { it.appliesTo(realtimeMessage) } + } + + } + +} \ No newline at end of file diff --git a/Realtime/src/commonTest/kotlin/RealtimeMessageEventTypeTest.kt b/Realtime/src/commonTest/kotlin/RealtimeEventTest.kt similarity index 69% rename from Realtime/src/commonTest/kotlin/RealtimeMessageEventTypeTest.kt rename to Realtime/src/commonTest/kotlin/RealtimeEventTest.kt index fedffe9a2..872393dc3 100644 --- a/Realtime/src/commonTest/kotlin/RealtimeMessageEventTypeTest.kt +++ b/Realtime/src/commonTest/kotlin/RealtimeEventTest.kt @@ -1,11 +1,22 @@ import io.github.jan.supabase.realtime.RealtimeChannel import io.github.jan.supabase.realtime.RealtimeMessage +import io.github.jan.supabase.realtime.event.RBroadcastEvent +import io.github.jan.supabase.realtime.event.RCloseEvent +import io.github.jan.supabase.realtime.event.RErrorEvent +import io.github.jan.supabase.realtime.event.RPostgresChangesEvent +import io.github.jan.supabase.realtime.event.RPostgresServerChangesEvent +import io.github.jan.supabase.realtime.event.RPresenceDiffEvent +import io.github.jan.supabase.realtime.event.RPresenceStateEvent +import io.github.jan.supabase.realtime.event.RSystemEvent +import io.github.jan.supabase.realtime.event.RSystemReplyEvent +import io.github.jan.supabase.realtime.event.RTokenExpiredEvent +import io.github.jan.supabase.realtime.event.RealtimeEvent import kotlinx.serialization.json.buildJsonObject import kotlinx.serialization.json.put import kotlin.test.Test import kotlin.test.assertEquals -class RealtimeMessageEventTypeTest { +class RealtimeEventTest { @Test fun testSystemType() { @@ -17,7 +28,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.SYSTEM, message.eventType) + assertEquals(RSystemEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -30,7 +41,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.SYSTEM_REPLY, message.eventType) + assertEquals(RSystemReplyEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -45,7 +56,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.POSTGRES_SERVER_CHANGES, message.eventType) + assertEquals(RPostgresServerChangesEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -58,7 +69,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.POSTGRES_CHANGES, message.eventType) + assertEquals(RPostgresChangesEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -71,7 +82,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.BROADCAST, message.eventType) + assertEquals(RBroadcastEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -84,7 +95,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.CLOSE, message.eventType) + assertEquals(RCloseEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -97,7 +108,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.ERROR, message.eventType) + assertEquals(RErrorEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -110,7 +121,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.PRESENCE_DIFF, message.eventType) + assertEquals(RPresenceDiffEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -123,7 +134,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.PRESENCE_STATE, message.eventType) + assertEquals(RPresenceStateEvent, RealtimeEvent.resolveEvent(message)) } @Test @@ -136,7 +147,7 @@ class RealtimeMessageEventTypeTest { }, ref = "" ) - assertEquals(RealtimeMessage.EventType.TOKEN_EXPIRED, message.eventType) + assertEquals(RTokenExpiredEvent, RealtimeEvent.resolveEvent(message)) } } \ No newline at end of file