Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some way of subscribe to shared flow without slowing-down others #4224

Open
kunyavskiy opened this issue Sep 9, 2024 · 7 comments
Open

Some way of subscribe to shared flow without slowing-down others #4224

kunyavskiy opened this issue Sep 9, 2024 · 7 comments

Comments

@kunyavskiy
Copy link
Contributor

Use case

I have a SharedFlow, which shares some complex computations with many consumers. Basically, I have two types of consumers: some important internal services of the application and web clients, which read data over HTTP or websocket.

Events in flow are some diffs with snapshots sent once in onSubscribe. None of the consumers is capable of skipping events. Web clients can reconnect to receive new snapshots; internal services must connect once. Also, internal services are quite fast and behave well, while web clients can be different because of connection issues or just the browser being too slow in drawing things.

And here I have a problem with organizing such a kind of subscription. I either need to make a large buffer before shareIn, which leads to unbounded memory usage, or, I need to allow a slow client to affect other consumers and even suspend computation, if there is a small buffer.

The Shape of the API

Ideally, I would like to specify some SharedFlow subscriptions as "unimportant". As an effect, if only "unimportant" subscribers still have not consumed an element that needs to be removed from the buffer, they are just canceled, and no suspension of the emitter happens.

I'm not sure what a good API for that is, but it probably involves something like the .onSubscribe function, which transforms shared flow into a special wrapper implementation of sharedFlow.

Maybe another option is resubscribing instead of canceling the consumer. It looks like it's not much distinguishable from DROP_OLDEST bufferisation, but in case of non-trivial onSubscribe it would be different.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Sep 10, 2024

That's an interesting proposal.

If I understand your use case correctly, what you need is not "important" and "unimportant" subscriptions, but instead a way to restart the flow collection completely if the buffer is full. If we had that, you'd be able to solve your problem using two different flows for different subscriber classes like this:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    runBlocking {
        val originalFlow = MutableSharedFlow<Int>()

        val flowForSlowAndUnimportantSubscribers: Flow<Int> =
            originalFlow.buffer(onBufferOverflow = RESTART_SUBSCRIBER)

        withContext(Dispatchers.Default) {
            val i = AtomicInteger(0)
            val quickCollector = launch {
                originalFlow.onSubscribe { println("Starting with ${i.value}") }.collect {
                    println("The quick collector received $it")
                }
            }
            val slowCollector = launch {
                originalFlow.onSubscribe { println("Starting with ${i.value}") }.collect {
                    println("The slow and stupid collector received $it")
                    delay(100)
                }
            }
            repeat(5) {
                delay(30)
                i.value = it
                originalFlow.emit(it)
            }
            quickCollector.cancel()
            slowCollector.cancel()
        }
    }
}

It seems like it doesn't even have to do with flow sharing, but instead, it's just a plain Flow operation.

Does that sound about right?

@dkhalanskyjb
Copy link
Collaborator

I hacked something together:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.*

fun main() {
    runBlocking {
        val originalFlow = MutableSharedFlow<Int>()

        withContext(Dispatchers.Default) {
            val i = AtomicInteger(0)
            val quickCollector = launch {
                originalFlow.onSubscription { println("Starting a quick collector with ${i.get()}") }.collect {
                    println("The quick collector received $it")
                }
            }
            val slowCollector = launch {
                while (true) {
                    try {
                        originalFlow
                            .onSubscription { println("Starting a slow collector with ${i.get()}") }
                            .collectLatest {
                            try {
                                println("The slow and stupid collector received $it")
                                delay(100)
                            } catch (e: CancellationException) {
                                throw RestartException()
                            }
                        }
                    } catch (e: RestartException) {
                        
                    }
                }
            }
            repeat(5) {
                delay(30)
                i.set(it)
                originalFlow.emit(it)
            }
            quickCollector.cancel()
            slowCollector.cancel()
        }
    }
}

class RestartException(): Exception()

(Runnable version: https://pl.kotl.in/g3o0TwSkN)

Does this do what you want?

@kunyavskiy
Copy link
Contributor Author

Yes. It's probably also reasonable to choose between throwing an exception from collect and resubscription, I see use-cases for both. Either of them solve my use-case.

@dkhalanskyjb
Copy link
Collaborator

Will the second (hacked-together) approach work for you in the meantime (or maybe even long-term), or do you see some issues with it?

One thing to note is that the subscriptions that are slow to react to cancellations (e.g., have blocking non-interruptible code) can still slow down the whole pipeline. I'm not sure how crucial this is.

@kunyavskiy
Copy link
Contributor Author

As a workaround for now, I'm fine with .buffer(Int.MAX_VALUE) and hope the clients behave are not hanged, but just slow.

Maybe there is an option to not wait while they are canceled but rather cancel them, when they would try to read the next element from the buffer? That's the place, when my understanding of coroutines internal is definitely not good enough.

@hoangchungk53qx1
Copy link

i think u can buffer event on it

@hoangchungk53qx1
Copy link

I hacked something together:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.*

fun main() {
    runBlocking {
        val originalFlow = MutableSharedFlow<Int>()

        withContext(Dispatchers.Default) {
            val i = AtomicInteger(0)
            val quickCollector = launch {
                originalFlow.onSubscription { println("Starting a quick collector with ${i.get()}") }.collect {
                    println("The quick collector received $it")
                }
            }
            val slowCollector = launch {
                while (true) {
                    try {
                        originalFlow
                            .onSubscription { println("Starting a slow collector with ${i.get()}") }
                            .collectLatest {
                            try {
                                println("The slow and stupid collector received $it")
                                delay(100)
                            } catch (e: CancellationException) {
                                throw RestartException()
                            }
                        }
                    } catch (e: RestartException) {
                        
                    }
                }
            }
            repeat(5) {
                delay(30)
                i.set(it)
                originalFlow.emit(it)
            }
            quickCollector.cancel()
            slowCollector.cancel()
        }
    }
}

class RestartException(): Exception()

(Runnable version: https://pl.kotl.in/g3o0TwSkN)

Does this do what you want?

good idea, but I have a question, why not buffer all current shareflow events and replay them for new consumer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants