Skip to content

Commit

Permalink
Improve debugging experience of leaked shift calls (#2884)
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev authored Jan 12, 2023
1 parent 9ca5f8c commit 7156f4a
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 259 deletions.
4 changes: 4 additions & 0 deletions arrow-libs/core/arrow-core/api/arrow-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -3023,6 +3023,10 @@ public final class arrow/core/continuations/ResultKt {
public abstract class arrow/core/continuations/ShiftCancellationException : arrow/core/continuations/CancellationExceptionNoTrace {
}

public final class arrow/core/continuations/ShiftLeakedException : java/lang/IllegalStateException {
public fun <init> ()V
}

public final class arrow/core/continuations/Suspend : arrow/core/continuations/ShiftCancellationException {
public fun <init> (Larrow/core/continuations/Token;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public final fun getRecover ()Lkotlin/jvm/functions/Function2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public fun <R, A> eagerEffect(f: suspend EagerEffectScope<R>.() -> A): EagerEffe
private class DefaultEagerEffect<R, A>(private val f: suspend EagerEffectScope<R>.() -> A) : EagerEffect<R, A> {
override fun <B> fold(recover: (R) -> B, transform: (A) -> B): B {
val token = Token()
val isActive = AtomicRef(true)
val eagerEffectScope =
object : EagerEffectScope<R> {
// Shift away from this Continuation by intercepting it, and completing it with
Expand All @@ -180,21 +181,26 @@ private class DefaultEagerEffect<R, A>(private val f: suspend EagerEffectScope<R
// CancellationException and thus effectively recovering from the cancellation/shift.
// This means try/catch is also capable of recovering from monadic errors.
// See: EagerEffectSpec - try/catch tests
throw Eager(token, r, recover as (Any?) -> Any?)
if (isActive.get()) throw Eager(token, r, recover as (Any?) -> Any?)
else throw ShiftLeakedException()
}

return try {
suspend { transform(f(eagerEffectScope)) }
.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
suspend {
val res = f(eagerEffectScope).also { isActive.set(false) }
transform(res)
}.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
result.getOrElse { throwable ->
if (throwable is Eager && token == throwable.token) {
throwable.recover(throwable.shifted) as B
} else throw throwable
}
}) as B
} catch (e: Eager) {
if (token == e.token) e.recover(e.shifted) as B
else throw e
if (token == e.token) {
isActive.set(false)
e.recover(e.shifted) as B
} else throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ import kotlin.coroutines.resumeWithException
*
* <!--- TOC -->
* [Writing a program with Effect<R, A>](#writing-a-program-with-effect<r-a>)
* [Handling errors](#handling-errors)
* [Structured Concurrency](#structured-concurrency)
* [Arrow Fx Coroutines](#arrow-fx-coroutines)
* [parZip](#parzip)
* [parTraverse](#partraverse)
* [raceN](#racen)
* [bracketCase / Resource](#bracketcase--resource)
* [KotlinX](#kotlinx)
* [withContext](#withcontext)
* [async](#async)
* [launch](#launch)
* [Strange edge cases](#strange-edge-cases)
* [Writing a program with Effect<R, A>](#writing-a-program-with-effect<r-a>)
* [Handling errors](#handling-errors)
* [Structured Concurrency](#structured-concurrency)
* [Arrow Fx Coroutines](#arrow-fx-coroutines)
* [parZip](#parzip)
* [parTraverse](#partraverse)
* [raceN](#racen)
* [bracketCase / Resource](#bracketcase--resource)
* [KotlinX](#kotlinx)
* [withContext](#withcontext)
* [async](#async)
* [launch](#launch)
* [Leaking `shift`](#leaking-shift)
* <!--- END -->
*
Expand Down Expand Up @@ -418,7 +418,7 @@ import kotlin.coroutines.resumeWithException
*
* ### KotlinX
* #### withContext
* It's always safe to call `shift` from `withContext` since it runs in place, so it has no way of leaking `shift`.
* It's always safe to call `shift` from `withContext` since it runs _in place_, so it has no way of leaking `shift`.
* When `shift` is called from within `withContext` it will cancel all `Job`s running inside the `CoroutineScope` of `withContext`.
*
* <!--- INCLUDE
Expand Down Expand Up @@ -483,6 +483,8 @@ import kotlin.coroutines.resumeWithException
* #### async
*
* When calling `shift` from `async` you should **always** call `await`, otherwise `shift` can leak out of its scope.
* So it's safe to call `shift` from `async` as long as you **always** call `await` on the `Deferred` returned by `async`,
* but we advise using Arrow Fx `parZip`, `raceN`, `parTraverse`, etc instead.
*
* <!--- INCLUDE
* import arrow.core.continuations.effect
Expand All @@ -500,14 +502,30 @@ import kotlin.coroutines.resumeWithException
* val fa = async<Int> { shift(errorA) }
* val fb = async<Int> { shift(errorB) }
* fa.await() + fb.await()
* }.fold({ error -> error shouldBeIn listOf(errorA, errorB) }, { fail("Int can never be the result") })
* }.fold(
* { error ->
* println(error)
* error shouldBeIn listOf(errorA, errorB)
* },
* { fail("Int can never be the result") }
* )
* }
* }
* ```
* <!--- KNIT example-effect-guide-11.kt -->
* ```text
* ErrorA
* ```
*
* The example here will always print `ErrorA`, but never `ErrorB`. This is because `fa` is awaited first, and when it `shifts` it will cancel `fb`.
* If instead we used `awaitAll`, then it would print `ErrorA` or `ErrorB` due to both `fa` and `fb` being awaited in parallel.
*
* #### launch
*
* It's **not allowed** to call `shift` from within `launch`. This is because `launch` creates a separate unrelated child Job/Continuation.
* Any calls to `shift` inside of `launch` will be ignored by `effect` and result in an exception being thrown inside `launch`.
* Because KotlinX Coroutines ignores `CancellationException`, and thus swallows the `shift` call.
*
* <!--- INCLUDE
* import arrow.core.continuations.effect
* import io.kotest.assertions.fail
Expand All @@ -519,29 +537,30 @@ import kotlin.coroutines.resumeWithException
* suspend fun main() {
* val errorA = "ErrorA"
* val errorB = "ErrorB"
* val int = 45
* effect<String, Int> {
* coroutineScope<Int> {
* launch { shift(errorA) }
* launch { shift(errorB) }
* int
* 45
* }
* }.fold({ fail("Shift can never finish") }, { it shouldBe int })
* }.fold({ fail("Shift can never finish") }, ::println)
* }
* ```
* <!--- KNIT example-effect-guide-12.kt -->
* ```text
* 45
* ```
*
* As you can see from the output, the `effect` block is still executed, but the `shift` calls inside `launch` are ignored.
*
* #### Leaking `shift`
*
* #### Strange edge cases
* **IMPORTANT:** Capturing `shift` and leaking it outside of `effect { }` and invoking it outside its scope will yield unexpected results.
*
* **NOTE**
* Capturing `shift` into a lambda, and leaking it outside of `Effect` to be invoked outside will yield unexpected results.
* Below we capture `shift` from inside the DSL, and then invoke it outside its context `EffectScope<String>`.
* Below an example of the capturing of `shift` inside a `suspend lambda`, and then invoking it outside its `effect { }` scope.
*
* <!--- INCLUDE
* import arrow.core.continuations.effect
* import kotlinx.coroutines.Deferred
* import kotlinx.coroutines.async
* import kotlinx.coroutines.coroutineScope
*
* suspend fun main() {
* -->
Expand All @@ -553,21 +572,63 @@ import kotlin.coroutines.resumeWithException
* suspend { shift("error") }
* }.fold({ }, { leakedShift -> leakedShift.invoke() })
* ```
* <!--- KNIT example-effect-guide-13.kt -->
*
* The same violation is possible in all DSLs in Kotlin, including Structured Concurrency.
* When we invoke `leakedShift` outside of `effect { }` a special `ShiftLeakedException` is thrown to improve the debugging experience.
* The message clearly states that `shift` was leaked outside its scope, and the stacktrace will point to the exact location where `shift` was captured.
* In this case in line `9` of `example-effect-guide-13.kt`, which is stated in the second line of the stacktrace: `invokeSuspend(example-effect-guide-13.kt:9)`.
*
* ```kotlin
* val leakedAsync = coroutineScope<suspend () -> Deferred<Unit>> {
* suspend {
* async {
* println("I am never going to run, until I get called invoked from outside")
* }
* }
* }
* ```text
* Exception in thread "main" arrow.core.continuations.ShiftLeakedException:
* shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked
* This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders.
*
* leakedAsync.invoke().await()
* See: Effect KDoc for additional information.
* at arrow.core.continuations.FoldContinuation.shift(Effect.kt:770)
* at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invokeSuspend(example-effect-guide-13.kt:9)
* at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invoke(example-effect-guide-13.kt)
* ```
* <!--- KNIT example-effect-guide-13.kt -->
*
* An example with KotlinX Coroutines launch. Which can _concurrently_ leak `shift` outside of its scope.
* In this case by _delaying_ the invocation of `shift` by `3.seconds`,
* we can see that the `ShiftLeakedException` is again thrown when `shift` is invoked.
*
* <!--- INCLUDE
* import kotlinx.coroutines.launch
* import kotlinx.coroutines.delay
* import kotlinx.coroutines.coroutineScope
* import kotlinx.coroutines.runBlocking
* import arrow.core.continuations.effect
* import kotlin.time.Duration.Companion.seconds
*
* fun main(): Unit = runBlocking {
* -->
* <!--- SUFFIX
* }
* -->
* ```kotlin
* effect<String, Int> {
* launch {
* delay(3.seconds)
* shift("error")
* }
* 1
* }.fold(::println, ::println)
* ```
* ```text
* 1
* Exception in thread "main" arrow.core.continuations.ShiftLeakedException:
* shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked
* This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders.
*
* See: Effect KDoc for additional information.
* at arrow.core.continuations.FoldContinuation.shift(Effect.kt:780)
* at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt$main$1$1$1$1.invokeSuspend(example-effect-guide-14.kt:17)
* at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) <13 internal lines>
* at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt.main(example-effect-guide-14.kt:11)
* at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt.main(example-effect-guide-14.kt)
* ```
* <!--- KNIT example-effect-guide-14.kt -->
*/
public interface Effect<out R, out A> {
/**
Expand Down Expand Up @@ -716,17 +777,21 @@ internal class FoldContinuation<R, B>(
private val error: suspend (Throwable) -> B,
private val parent: Continuation<B>,
) : Continuation<B>, Token(), EffectScope<R> {

constructor(ignored: Token, context: CoroutineContext, parent: Continuation<B>) : this(context, { throw it }, parent)
constructor(
ignored: Token,
context: CoroutineContext,
error: suspend (Throwable) -> B,
parent: Continuation<B>,
) : this(context, error, parent)

lateinit var recover: suspend (R) -> Any?


private val isActive: AtomicRef<Boolean> = AtomicRef(true)

internal fun complete(): Boolean = isActive.getAndSet(false)

// Shift away from this Continuation by intercepting it, and completing it with
// ShiftCancellationException
// This is needed because this function will never yield a result,
Expand All @@ -739,8 +804,9 @@ internal class FoldContinuation<R, B>(
// CancellationException and thus effectively recovering from the cancellation/shift.
// This means try/catch is also capable of recovering from monadic errors.
// See: EffectSpec - try/catch tests
throw Suspend(this, r, recover as suspend (Any?) -> Any?)

if (isActive.get()) throw Suspend(this, r, recover as suspend (Any?) -> Any?)
else throw ShiftLeakedException()

// In contrast to `createCoroutineUnintercepted this doesn't create a new ContinuationImpl
private fun (suspend () -> B).startCoroutineUnintercepted() {
try {
Expand All @@ -753,15 +819,20 @@ internal class FoldContinuation<R, B>(
parent.resumeWithException(e)
}
}

override fun resumeWith(result: Result<B>) {
result.fold(parent::resume) { throwable ->
when {
throwable is Suspend && this === throwable.token ->
throwable is Suspend && this === throwable.token -> {
complete()
suspend { throwable.recover(throwable.shifted) as B }.startCoroutineUnintercepted()

}

throwable is Suspend -> parent.resumeWith(result)
else -> suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted()
else -> {
complete()
suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted()
}
}
}
}
Expand Down Expand Up @@ -800,12 +871,12 @@ internal class FoldContinuation<R, B>(
public fun <R, A> effect(f: suspend EffectScope<R>.() -> A): Effect<R, A> = DefaultEffect(f)

private class DefaultEffect<R, A>(val f: suspend EffectScope<R>.() -> A) : Effect<R, A> {

override suspend fun <B> fold(
recover: suspend (shifted: R) -> B,
transform: suspend (value: A) -> B,
): B = fold({ throw it }, recover, transform)

// We create a `Token` for fold Continuation, so we can properly differentiate between nested folds
override suspend fun <B> fold(
error: suspend (error: Throwable) -> B,
Expand All @@ -816,18 +887,33 @@ private class DefaultEffect<R, A>(val f: suspend EffectScope<R>.() -> A) : Effec
val shift = FoldContinuation<R, B>(cont.context, error, cont)
shift.recover = recover
try {
val fold: suspend EffectScope<R>.() -> B = { transform(f(this)) }
val fold: suspend EffectScope<R>.() -> B = {
val res = f(this).also { shift.complete() }
transform(res)
}
fold.startCoroutineUninterceptedOrReturn(shift, shift)
} catch (e: Suspend) {
if (shift === e.token) {
shift.complete()
val f: suspend () -> B = { e.recover(e.shifted) as B }
f.startCoroutineUninterceptedOrReturn(cont)
} else throw e
} catch (e: Throwable) {
shift.complete()
val f: suspend () -> B = { error(e.nonFatalOrThrow()) }
f.startCoroutineUninterceptedOrReturn(cont)
}
}
}

public suspend fun <A> Effect<A, A>.merge(): A = fold(::identity, ::identity)

public class ShiftLeakedException : IllegalStateException(
"""
shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked
This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders.
See: Effect KDoc for additional information.
""".trimIndent()
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.identity
import arrow.core.left
import arrow.core.right
import io.kotest.assertions.fail
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
Expand Down Expand Up @@ -133,4 +134,12 @@ class EagerEffectSpec : StringSpec({
}.runCont()
} shouldBe Either.Left(e)
}

"shift leaked results in ShiftLeakException" {
shouldThrow<ShiftLeakedException> {
effect {
suspend { shift<Unit>("failure") }
}.fold(::println) { it.invoke() }
}
}
})
Loading

0 comments on commit 7156f4a

Please sign in to comment.