Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Realtime - Subscribing to the same channel id causes upstream closes / unexpected behavior. #818

Open
2 of 3 tasks
Brainfree opened this issue Dec 21, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@Brainfree
Copy link

Brainfree commented Dec 21, 2024

General Info

  • I checked for similar bug report
  • I am using the latest version (latest/3.0.3 doesn't seem to solve to issue)
  • I checked the troubleshooting page for similar problems

Version(s)

3.0.2

Kotlin Target(s) and their respective versions

2.0.21 / JVM 8

What happened? (include your code)

Hey everyone, love the work everyone has done.

I am not sure if I missed some details in the documentation here, or I am misunderstanding how presence works, or this is actually a bug. I tried tinkering around with channels, and tried connecting multiple separate coroutines ("user clients") to some channel, each client represents some user who is interested in the channel (think microservices). I expect it to just work, but it seems when the second user subscribes to the channel, the upstream is sending a close command to the first subscriber. If I understand correctly, channels do not have to be created ahead of time. Given the upstream server seems to be forcing the channel closed, I am a bit confused why. I dug into the Realtime logs, and the only hint I see is: "Tenant has no connected users, database connection will be terminated". Am I restricted from doing some in-process communication on the same channel? (1 channel per client) Do I need to be cautious of where the channel is sourcing from, and avoid subscribing to the channel if it happened to be on the same JVM/Client?

Something to note: If they all subscribe to different channels (ex: main-$user instead of main), it works fine.
It doesn't seem to be permissions or quota.

Edit: Seems to be indeed 1 channel subscription (by id) per connection. Don't see where this is documented. If this is confirmed the case, perhaps the behavior should be to throw an error when it detects a existing subscription?

Thank you for your time.

import io.github.jan.supabase.createSupabaseClient
import io.github.jan.supabase.logging.LogLevel
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.channel
import io.github.jan.supabase.realtime.presenceDataFlow
import io.github.jan.supabase.realtime.realtime
import io.github.jan.supabase.realtime.track
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.Serializable

object Realtime {

    @Serializable
    data class Pres(val data: String)

    @JvmStatic
    fun main(args: Array<String>): Unit = runBlocking(Dispatchers.Default) {

        val client = createSupabaseClient(
            supabaseUrl = "<REDACTED>",
            supabaseKey = "<REDACTED>"
        ) {
            install(Realtime)
            defaultLogLevel = LogLevel.DEBUG
        }

        val realtime = client.realtime

        realtime.status.onEach {
            println("realtime.status = $it")
        }.launchIn(this)

        realtime.connect()

        repeat(3) { id ->
            launchClient(realtime, "user-$id")
        }

    }

    fun CoroutineScope.launchClient(realtime: Realtime, user: String) {
        launch {

            // all "user clients" attempt to connect to some common channel
            val channel = realtime.channel("main") {
                presence { key = user }
            }

            fun log(msg: Any?) = println("$user: $msg")

            channel.status.onEach { status ->
                log("status == $status")
            }.launchIn(this)

            channel.presenceDataFlow<Pres>().onEach { data ->
                log("data == $data")
            }.launchIn(this)

            log("subscribing...")
            channel.subscribe(blockUntilSubscribed = true)

            log("tracking...")
            channel.track(Pres("$user is ready"))

            log("waiting...")
            coroutineContext.job.join()
        }

    }

}

Steps To Reproduce (optional)

Run the snippet and you should see none of the "clients" make it past the tracking step.

Relevant log output (optional)

Connected to the target VM, address: '127.0.0.1:58569', transport: 'socket'
Info: (Supabase-Core) SupabaseClient created! Please report any bugs you find.
realtime.status = CONNECTING
Info: (Supabase-Realtime) Connected to realtime websocket!
realtime.status = CONNECTED
user-0: status == UNSUBSCRIBED
user-2: status == UNSUBSCRIBED
user-1: status == UNSUBSCRIBED
user-2: subscribing...
user-0: subscribing...
user-1: subscribing...
Debug: (Supabase-Realtime) Subscribing to channel realtime:main
user-0: status == SUBSCRIBING
Debug: (Supabase-Realtime) Subscribing to channel realtime:main
Debug: (Supabase-Realtime) Subscribing to channel realtime:main
user-1: status == SUBSCRIBING
user-2: status == SUBSCRIBING
Debug: (Supabase-Realtime) Subscribing to channel with body {"config":{"broadcast":{"ack":false,"self":false},"presence":{"key":"user-0"},"postgres_changes":[],"private":false}}
Debug: (Supabase-Realtime) Subscribing to channel with body {"config":{"broadcast":{"ack":false,"self":false},"presence":{"key":"user-1"},"postgres_changes":[],"private":false}}
Debug: (Supabase-Realtime) Subscribing to channel with body {"config":{"broadcast":{"ack":false,"self":false},"presence":{"key":"user-2"},"postgres_changes":[],"private":false}}
Debug: (Supabase-Realtime) Received message RealtimeMessage(topic=realtime:main, event=phx_reply, payload={"status":"ok","response":{"postgres_changes":[]}}, ref=null)
Debug: (Supabase-Realtime) Received event phx_reply for channel realtime:main
Debug: (Supabase-Realtime) Joined channel realtime:main
user-2: status == SUBSCRIBED
Debug: (Supabase-Realtime) Received message RealtimeMessage(topic=realtime:main, event=phx_reply, payload={"status":"ok","response":{"postgres_changes":[]}}, ref=null)
Debug: (Supabase-Realtime) Received event phx_reply for channel realtime:main
Debug: (Supabase-Realtime) Received message RealtimeMessage(topic=realtime:main, event=phx_reply, payload={"status":"ok","response":{"postgres_changes":[]}}, ref=null)
Debug: (Supabase-Realtime) Received event phx_reply for channel realtime:main
Debug: (Supabase-Realtime) Received message RealtimeMessage(topic=realtime:main, event=presence_state, payload={}, ref=null)
Debug: (Supabase-Realtime) Received event presence_state for channel realtime:main
Debug: (Supabase-Realtime) Received message RealtimeMessage(topic=realtime:main, event=phx_close, payload={}, ref=null)
Debug: (Supabase-Realtime) Received event phx_close for channel realtime:main
user-2: tracking...
Debug: (Supabase-Realtime) Unsubscribing from channel realtime:main
user-2: status == UNSUBSCRIBING
Debug: (Supabase-Realtime) No more subscriptions, disconnecting from realtime websocket
Debug: (Supabase-Realtime) Closing websocket connection
user-2: data == []
Debug: (Supabase-Realtime) Unsubscribed from channel realtime:main
realtime.status = DISCONNECTED
Exception in thread "main" java.lang.IllegalStateException: You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?
	at io.github.jan.supabase.realtime.RealtimeChannelImpl.track(RealtimeChannelImpl.kt:140)
	at mystic.proxyinspector.Realtime$launchClient$1.invokeSuspend(Realtime.kt:89)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith$$$capture(ContinuationImpl.kt:33)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:101)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:832)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
Disconnected from the target VM, address: '127.0.0.1:58569', transport: 'socket'

Process finished with exit code 1
@Brainfree Brainfree added the bug Something isn't working label Dec 21, 2024
@jan-tennert
Copy link
Collaborator

jan-tennert commented Dec 21, 2024

Well, you are trying to subscribe to the same channel multiple times with a single Supabase Client. Each new channel overrides to old one, that's how the subscriptions work:

@SupabaseInternal
override fun Realtime.addChannel(channel: RealtimeChannel) {
_subscriptions[channel.topic] = channel
}

If they all subscribe to different channels (ex: main-$user instead of main), it works fine.

That explains this, different ids means they don't override each other.

The solution is just to have one client per user/device. So in your example, you would have a new Supabase Client for each user.

@Brainfree
Copy link
Author

Brainfree commented Dec 22, 2024

Well, you are trying to subscribe to the same channel multiple times with a single Supabase Client. Each new channel overrides to old one, that's how the subscriptions work:

@SupabaseInternal
override fun Realtime.addChannel(channel: RealtimeChannel) {
_subscriptions[channel.topic] = channel
}

If they all subscribe to different channels (ex: main-$user instead of main), it works fine.

That explains this, different ids means they don't override each other.

The solution is just to have one client per user/device. So in yohur example, you would have a new Supabase Client for each user.

I swapped out the key, the "userId" as exemplified in the docs. If I connect to a new channel under the context of a unique user, I would generally expect it to be keyed based on the <channel_id/topic, user_id> pair. So it's not possible to subscribe to a channel multiple times (as unique users) over a single connection? Channel ids are in a global context, so I didn't expect this behavior. At the very least, it should throw an error instead of allowing itself to do something that is not supported? If it's keyed based on the channel_id / topic, then shouldn't realtime.channel return the existing instance? Thanks!

@jan-tennert
Copy link
Collaborator

jan-tennert commented Dec 22, 2024

If I connect to a new channel under the context of a unique user, I would generally expect it to be keyed based on the <channel_id/topic, user_id> pair.

Well, the key is only relevant for presences. Generally any realtime messages (broadcasts, db changes, presences) are identified through the topic (which is the channel id) and then sent to channel event handlers. The presence key is not relevant for any client side message identification. (Also not received in any message)
And Realtime does not allow subscribing to the same channel multiple times within a single connection

At the very least, it should throw an error instead of allowing itself to do something that is not supported? If it's keyed based on the channel_id / topic, then shouldn't realtime.channel return the existing instance? Thanks!

The channel method only really creates a channel and does not save it anywhere. When you call subscribe, then the realtime client knows it exists and should receive events. Originally it was named createChannel, but to match the other client libs it was renamed.

I can look into making this more clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants