Skip to content

Commit

Permalink
Merge pull request #696 from supabase-community/realtime-event-refactor
Browse files Browse the repository at this point in the history
Refactor Realtime event system
  • Loading branch information
jan-tennert authored Aug 27, 2024
2 parents 434e1b5 + c66dc9d commit eccacf8
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
fun disconnect()

@SupabaseInternal
fun RealtimeChannel.addChannel(channel: RealtimeChannel)

@SupabaseInternal
fun RealtimeChannel.deleteChannel(channel: RealtimeChannel)
fun Realtime.addChannel(channel: RealtimeChannel)

/**
* Unsubscribes and removes a channel from the [subscriptions]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ sealed interface RealtimeChannel {
*/
val supabaseClient: SupabaseClient

/**
* The realtime instance
*/
val realtime: Realtime

@SupabaseInternal
val callbackManager: CallbackManager

Expand Down Expand Up @@ -102,6 +107,9 @@ sealed interface RealtimeChannel {
@SupabaseInternal
fun RealtimeChannel.removePostgresChange(data: PostgresJoinConfig)

@SupabaseInternal
fun updateStatus(status: Status)

/**
* Represents the status of a channel
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package io.github.jan.supabase.realtime

import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.collections.AtomicMutableList
import io.github.jan.supabase.decodeIfNotEmptyOrDefault
import io.github.jan.supabase.gotrue.resolveAccessToken
import io.github.jan.supabase.logging.d
import io.github.jan.supabase.logging.e
import io.github.jan.supabase.logging.w
import io.github.jan.supabase.putJsonObject
import io.github.jan.supabase.realtime.data.BroadcastApiBody
import io.github.jan.supabase.realtime.data.BroadcastApiMessage
import io.github.jan.supabase.realtime.data.PostgresActionData
import io.github.jan.supabase.supabaseJson
import io.github.jan.supabase.realtime.event.RealtimeEvent
import io.ktor.client.statement.bodyAsText
import io.ktor.http.headers
import kotlinx.coroutines.channels.awaitClose
Expand All @@ -23,12 +20,8 @@ import kotlinx.coroutines.flow.first
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement
import kotlinx.serialization.json.jsonArray
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.longOrNull
import kotlinx.serialization.json.put
import kotlinx.serialization.json.putJsonObject
import kotlin.reflect.KClass
Expand All @@ -47,6 +40,7 @@ internal class RealtimeChannelImpl(
override val callbackManager = CallbackManagerImpl(realtimeImpl.serializer)
private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED)
override val status = _status.asStateFlow()
override val realtime: Realtime = realtimeImpl

override val supabaseClient = realtimeImpl.supabaseClient

Expand Down Expand Up @@ -84,71 +78,13 @@ internal class RealtimeChannelImpl(
}

@OptIn(SupabaseInternal::class)
@Suppress("CyclomaticComplexMethod") //TODO: Refactor this method
fun onMessage(message: RealtimeMessage) {
if(message.eventType == null) {
Realtime.logger.e { "Received message without event type: $message" }
suspend fun onMessage(message: RealtimeMessage) {
val event = RealtimeEvent.resolveEvent(message)
if(event == null) {
Realtime.logger.e { "Received message without event: $message" }
return
}
when(message.eventType) {
RealtimeMessage.EventType.TOKEN_EXPIRED -> {
Realtime.logger.w { "Received token expired event. This should not happen, please report this warning." }
}
RealtimeMessage.EventType.SYSTEM -> {
Realtime.logger.d { "Subscribed to channel ${message.topic}" }
_status.value = RealtimeChannel.Status.SUBSCRIBED
}
RealtimeMessage.EventType.SYSTEM_REPLY -> {
Realtime.logger.d { "Received system reply: ${message.payload}." }
if(status.value == RealtimeChannel.Status.UNSUBSCRIBING) {
_status.value = RealtimeChannel.Status.UNSUBSCRIBED
Realtime.logger.d { "Unsubscribed from channel ${message.topic}" }
}
}
RealtimeMessage.EventType.POSTGRES_SERVER_CHANGES -> { //check if the server postgres_changes match with the client's and add the given id to the postgres change objects (to identify them later in the events)
val serverPostgresChanges = message.payload["response"]?.jsonObject?.get("postgres_changes")?.jsonArray?.let { Json.decodeFromJsonElement<List<PostgresJoinConfig>>(it) } ?: listOf() //server postgres changes
callbackManager.setServerChanges(serverPostgresChanges)
if(status.value != RealtimeChannel.Status.SUBSCRIBED) {
Realtime.logger.d { "Joined channel ${message.topic}" }
_status.value = RealtimeChannel.Status.SUBSCRIBED
}
}
RealtimeMessage.EventType.POSTGRES_CHANGES -> {
val data = message.payload["data"]?.jsonObject ?: return
val ids = message.payload["ids"]?.jsonArray?.mapNotNull { it.jsonPrimitive.longOrNull } ?: emptyList() //the ids of the matching postgres changes
val postgresAction = supabaseJson.decodeFromJsonElement<PostgresActionData>(data)
val action = when(data["type"]?.jsonPrimitive?.content ?: "") {
"UPDATE" -> PostgresAction.Update(postgresAction.record ?: error("Received no record on update event"), postgresAction.oldRecord ?: error("Received no old record on update event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer)
"DELETE" -> PostgresAction.Delete(postgresAction.oldRecord ?: error("Received no old record on delete event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer)
"INSERT" -> PostgresAction.Insert(postgresAction.record ?: error("Received no record on update event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer)
"SELECT" -> PostgresAction.Select(postgresAction.record ?: error("Received no record on update event"), postgresAction.columns, postgresAction.commitTimestamp, realtimeImpl.serializer)
else -> error("Unknown event type ${message.event}")
}
callbackManager.triggerPostgresChange(ids, action)
}
RealtimeMessage.EventType.BROADCAST -> {
val event = message.payload["event"]?.jsonPrimitive?.content ?: ""
callbackManager.triggerBroadcast(event, message.payload["payload"]?.jsonObject ?: JsonObject(mutableMapOf()))
}
RealtimeMessage.EventType.CLOSE -> {
realtimeImpl.run {
deleteChannel(this@RealtimeChannelImpl)
}
Realtime.logger.d { "Unsubscribed from channel ${message.topic}" }
}
RealtimeMessage.EventType.ERROR -> {
Realtime.logger.e { "Received an error in channel ${message.topic}. That could be as a result of an invalid access token" }
}
RealtimeMessage.EventType.PRESENCE_DIFF -> {
val joins = message.payload["joins"]?.jsonObject?.decodeIfNotEmptyOrDefault(mapOf<String, Presence>()) ?: emptyMap()
val leaves = message.payload["leaves"]?.jsonObject?.decodeIfNotEmptyOrDefault(mapOf<String, Presence>()) ?: emptyMap()
callbackManager.triggerPresenceDiff(joins, leaves)
}
RealtimeMessage.EventType.PRESENCE_STATE -> {
val joins = message.payload.decodeIfNotEmptyOrDefault(mapOf<String, Presence>())
callbackManager.triggerPresenceDiff(joins, mapOf())
}
}
event.handle(this, message)
}

override suspend fun unsubscribe() {
Expand Down Expand Up @@ -277,5 +213,9 @@ internal class RealtimeChannelImpl(
awaitClose { callbackManager.removeCallbackById(id) }
}

override fun updateStatus(status: RealtimeChannel.Status) {
_status.value = status
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,23 @@ import io.github.jan.supabase.logging.i
import io.github.jan.supabase.logging.w
import io.github.jan.supabase.realtime.websocket.KtorRealtimeWebsocketFactory
import io.github.jan.supabase.realtime.websocket.RealtimeWebsocket
import io.github.jan.supabase.supabaseJson
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
import io.ktor.client.plugins.websocket.sendSerialized
import io.ktor.client.statement.HttpResponse
import io.ktor.http.URLProtocol
import io.ktor.http.path
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.json.buildJsonObject
import kotlin.time.Duration.Companion.milliseconds

@PublishedApi internal class RealtimeImpl(override val supabaseClient: SupabaseClient, override val config: Realtime.Config) : Realtime {

Expand Down Expand Up @@ -163,7 +155,7 @@ import kotlin.time.Duration.Companion.milliseconds
_status.value = Realtime.Status.DISCONNECTED
}

private fun onMessage(message: RealtimeMessage) {
private suspend fun onMessage(message: RealtimeMessage) {
Realtime.logger.d { "Received message $message" }
val channel = subscriptions[message.topic] as? RealtimeChannelImpl
if(message.ref?.toIntOrNull() == heartbeatRef) {
Expand Down Expand Up @@ -208,11 +200,6 @@ import kotlin.time.Duration.Companion.milliseconds
}
}

@SupabaseInternal
override fun RealtimeChannel.deleteChannel(channel: RealtimeChannel) {
_subscriptions.remove(channel.topic)
}

override suspend fun removeAllChannels() {
_subscriptions.forEach { (_, it) ->
if(it.status.value == RealtimeChannel.Status.SUBSCRIBED) {
Expand All @@ -227,7 +214,7 @@ import kotlin.time.Duration.Companion.milliseconds
}

@SupabaseInternal
override fun RealtimeChannel.addChannel(channel: RealtimeChannel) {
override fun Realtime.addChannel(channel: RealtimeChannel) {
_subscriptions[channel.topic] = channel
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,11 @@ package io.github.jan.supabase.realtime

import io.github.jan.supabase.annotations.SupabaseInternal
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive

/**
* Represents a message retrieved by the [RealtimeChannel]
*/
@Serializable
@SupabaseInternal
data class RealtimeMessage(val topic: String, val event: String, val payload: JsonObject, val ref: String?) {

@Transient
val eventType: EventType? = when {
event == RealtimeChannel.CHANNEL_EVENT_SYSTEM && payload["status"]?.jsonPrimitive?.content == "ok" -> EventType.SYSTEM
event == RealtimeChannel.CHANNEL_EVENT_REPLY && payload["response"]?.jsonObject?.containsKey(RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES) ?: false -> EventType.POSTGRES_SERVER_CHANGES
event == RealtimeChannel.CHANNEL_EVENT_REPLY && payload["status"]?.jsonPrimitive?.content == "ok" -> EventType.SYSTEM_REPLY
event == RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES -> EventType.POSTGRES_CHANGES
event == RealtimeChannel.CHANNEL_EVENT_BROADCAST -> EventType.BROADCAST
event == RealtimeChannel.CHANNEL_EVENT_CLOSE -> EventType.CLOSE
event == RealtimeChannel.CHANNEL_EVENT_ERROR -> EventType.ERROR
event == RealtimeChannel.CHANNEL_EVENT_PRESENCE_DIFF -> EventType.PRESENCE_DIFF
event == RealtimeChannel.CHANNEL_EVENT_PRESENCE_STATE -> EventType.PRESENCE_STATE
event == RealtimeChannel.CHANNEL_EVENT_SYSTEM && payload["message"]?.jsonPrimitive?.content?.contains("access token has expired") ?: false -> EventType.TOKEN_EXPIRED
else -> null
}

enum class EventType {
SYSTEM, SYSTEM_REPLY, POSTGRES_SERVER_CHANGES, POSTGRES_CHANGES, BROADCAST, CLOSE, ERROR, PRESENCE_DIFF, PRESENCE_STATE, TOKEN_EXPIRED
}

}

data class RealtimeMessage(val topic: String, val event: String, val payload: JsonObject, val ref: String?)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.jan.supabase.realtime.event

import io.github.jan.supabase.realtime.RealtimeChannel
import io.github.jan.supabase.realtime.RealtimeMessage
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive

/**
* Handles broadcast events
*/
data object RBroadcastEvent : RealtimeEvent {

override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) {
val event = message.payload["event"]?.jsonPrimitive?.content ?: ""
channel.callbackManager.triggerBroadcast(event, message.payload["payload"]?.jsonObject ?: JsonObject(mutableMapOf()))
}

override fun appliesTo(message: RealtimeMessage): Boolean {
return message.event == RealtimeChannel.CHANNEL_EVENT_BROADCAST
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.github.jan.supabase.realtime.event

import io.github.jan.supabase.logging.d
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.RealtimeChannel
import io.github.jan.supabase.realtime.RealtimeMessage

/**
* Event that handles the closing of a channel
*/
data object RCloseEvent : RealtimeEvent {

override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) {
channel.realtime.removeChannel(channel)
Realtime.logger.d { "Unsubscribed from channel ${message.topic}" }
}

override fun appliesTo(message: RealtimeMessage): Boolean {
return message.event == RealtimeChannel.CHANNEL_EVENT_CLOSE
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.github.jan.supabase.realtime.event

import io.github.jan.supabase.logging.e
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.RealtimeChannel
import io.github.jan.supabase.realtime.RealtimeMessage

/**
* Event that handles an error event
*/
data object RErrorEvent : RealtimeEvent {

override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) {
Realtime.logger.e { "Received an error in channel ${message.topic}. That could be as a result of an invalid access token" }
}

override fun appliesTo(message: RealtimeMessage): Boolean {
return message.event == RealtimeChannel.CHANNEL_EVENT_ERROR
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.github.jan.supabase.realtime.event

import io.github.jan.supabase.realtime.PostgresAction
import io.github.jan.supabase.realtime.RealtimeChannel
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.data.PostgresActionData
import io.github.jan.supabase.realtime.realtime
import io.github.jan.supabase.supabaseJson
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.jsonArray
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.longOrNull

/**
* Handles postgres changes events
*/
data object RPostgresChangesEvent : RealtimeEvent {

override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) {
val data = message.payload["data"]?.jsonObject ?: return
val ids = message.payload["ids"]?.jsonArray?.mapNotNull { it.jsonPrimitive.longOrNull } ?: emptyList() //the ids of the matching postgres changes
val postgresAction = supabaseJson.decodeFromJsonElement<PostgresActionData>(data)
val action = when(data["type"]?.jsonPrimitive?.content ?: "") {
"UPDATE" -> PostgresAction.Update(
postgresAction.record ?: error("Received no record on update event"),
postgresAction.oldRecord ?: error("Received no old record on update event"),
postgresAction.columns,
postgresAction.commitTimestamp,
channel.supabaseClient.realtime.serializer
)
"DELETE" -> PostgresAction.Delete(
postgresAction.oldRecord ?: error("Received no old record on delete event"),
postgresAction.columns,
postgresAction.commitTimestamp,
channel.supabaseClient.realtime.serializer
)
"INSERT" -> PostgresAction.Insert(
postgresAction.record ?: error("Received no record on update event"),
postgresAction.columns,
postgresAction.commitTimestamp,
channel.supabaseClient.realtime.serializer
)
"SELECT" -> PostgresAction.Select(
postgresAction.record ?: error("Received no record on update event"),
postgresAction.columns,
postgresAction.commitTimestamp,
channel.supabaseClient.realtime.serializer
)
else -> error("Unknown event type ${message.event}")
}
channel.callbackManager.triggerPostgresChange(ids, action)
}

override fun appliesTo(message: RealtimeMessage): Boolean {
return message.event == RealtimeChannel.CHANNEL_EVENT_POSTGRES_CHANGES
}

}
Loading

0 comments on commit eccacf8

Please sign in to comment.