Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestion for a potential new Flow's timeout extension #4143

Open
tinder-cesardiez opened this issue May 30, 2024 · 4 comments
Open

Suggestion for a potential new Flow's timeout extension #4143

tinder-cesardiez opened this issue May 30, 2024 · 4 comments

Comments

@tinder-cesardiez
Copy link

tinder-cesardiez commented May 30, 2024

Considerations

  • Did you check the latest version of the library?

Yes

  • Do you actually need this feature? Maybe restructuring your code would neatly eliminate the problem the feature would be solving.

I'm not sure, I just wanted to bring this up since my teammates found this extension to be useful.

Use Case

Simplify Flow's Timeout extension fallbacks

The Shape of the API

fun <T> Flow<T>.timeout(
    timeout: Duration,
    onTimeout: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> = timeout(timeout).catch { cause ->
    if (cause is TimeoutCancellationException) {
        // Catch the TimeoutCancellationException emitted above.
        // Emit desired item on timeout.
        onTimeout(cause)
    } else {
        // Throw other exceptions.
        throw cause
    }
}

Usage

someFlow().timeout(X.seconds) {
    emitAll(someFallbackFlow())
}

Prior Art (Why - Thought Process)

When I looked at the timeout documentation I thought it was interesting I would need to "copy-paste" the sample of the documentation in order to deal with the fallback the way it's intended, so I thought it would be good to provide an alternative to developers to make this process a tiny bit easier, since otherwise we could run into lots of timeout blocks with very similar catch logic. Ideally we would want to handle exceptions as close as their source as possible, so the code sample made a lot of sense to me, then why not automating that a bit?

If we think the feature request makes sense, I'm happy to go ahead and make a code contribution. I just wanted to make sure this made sense beforehand.

Timeout Related Sources / Issues

@tinder-cesardiez
Copy link
Author

@dkhalanskyjb Do you think it'd make sense to make a contribution for this?

@fzhinkin
Copy link

fzhinkin commented Jun 5, 2024

@tinder-cesardiez, both core maintainers are on vacation, so it may take a while before the proposal is evaluated.

@dkhalanskyjb
Copy link
Collaborator

The exact suggestion has a flaw. Consider this code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

fun <T> Flow<T>.timeout(
    timeout: Duration,
    onTimeout: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> = timeout(timeout).catch { cause ->
    if (cause is TimeoutCancellationException) {
        // Catch the TimeoutCancellationException emitted above.
        // Emit desired item on timeout.
        onTimeout(cause)
    } else {
        // Throw other exceptions.
        throw cause
    }
}


fun main() = runBlocking {
    flow<Int> {
        awaitCancellation()
    }.timeout(50.milliseconds)
    .timeout(Duration.INFINITE) {
        println("Should be printed an infinity later")
    }.collect {
    }
}

(runnable version: https://pl.kotl.in/_s0UsTsR4)

timeout with the lambda will catch the exception from an unrelated timeout, and the println will be printed. This can be a real concern when you have several independent parameterized timeouts for one flow.

On the other hand, using timeout with catch is completely transparent: timeout throws an exception on a timeout, and catch catches it, obviously.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

fun main() = runBlocking {
    flow<Int> {
        awaitCancellation()
    }.timeout(50.milliseconds)
    .timeout(Duration.INFINITE)
    .catch {
        if (cause is TimeoutCancellationException) {
            println("One of the timeouts fired.")
        } else {
            throw cause
        }
    }.collect {
    }
}

This transparency and predictability is pretty valuable. If we introduce a timeout with a lambda, I'm pretty sure we'd want to avoid this gotcha, but then, we will have to introduce some new logic that's not just a combination of the existing operators.

This is not a huge issue if this new operator is actually useful: it has some learning curve on its own, but it's still a fairly straightforward API. Is it useful? In which scenarios? Looking for this exact pattern, I didn't find many people using it: https://grep.app/search?q=timeout%5C%28%5B%5E%29%5D%2A%5C%29%5Cs%2A%5C.catch%5Cs%2A%7B&regexp=true&case=true&filter[lang][0]=Kotlin

@qwwdfsad qwwdfsad added the flow label Jul 10, 2024
@pablobaxter
Copy link
Contributor

@tinder-cesardiez, this is similar to how I originally thought about this flow operator, until @qwwdfsad brought up that it was non-orthogonal and referenced this comment: #2745 (review)

TL;DR - Flow operators should have one job, and it should be composable with other operators. This suggestion goes against the one job per operator idea, as you now will have a timeout operator that can also catch exceptions and re-emit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants