diff --git a/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt b/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt index ff40ec9a7..d25148ff0 100644 --- a/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt +++ b/android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/RibCoroutineWorker.kt @@ -246,12 +246,20 @@ internal constructor( ) : Worker { override fun onStart(lifecycle: WorkerScopeProvider) { - // We can start it undispatched because Worker binder will already call `onStart` in correct - // context, - // but we still want to pass in `coroutineDispatcher` to resume from suspensions in `onStart` in + // We start it undispatched to keep the behavior of immediate binding of Worker when + // WorkerBinder.bind is called. + // We still want to pass in `coroutineContext` to resume from suspensions in `onStart` in // correct context. - lifecycle.coroutineScope.launch(coroutineContext, start = CoroutineStart.UNDISPATCHED) { - supervisorScope { ribCoroutineWorker.onStart(this) } + lifecycle.coroutineScope.launch(coroutineContext, CoroutineStart.UNDISPATCHED) { + supervisorScope { + ribCoroutineWorker.onStart(this) + // Keep this scope alive until cancelled. + // This is particularly important for cases where we do not launch long-running coroutines + // with scope, but instead install some completion handler that we expect to be called at + // worker + // unbinding. This is the case with Rx subscriptions with 'autoDispose(scope)' + awaitCancellation() + } } } diff --git a/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt b/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt index 89c4b3be3..6301eba30 100644 --- a/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt +++ b/android/libraries/rib-base/src/test/kotlin/com/uber/rib/core/RibCoroutineWorkerTest.kt @@ -16,6 +16,8 @@ package com.uber.rib.core import com.google.common.truth.Truth.assertThat +import com.uber.autodispose.coroutinesinterop.autoDispose +import io.reactivex.subjects.PublishSubject import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlinx.coroutines.CancellationException @@ -46,6 +48,7 @@ import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import org.junit.Rule import org.junit.Test +import org.mockito.kotlin.mock private const val ON_START_DELAY_DURATION_MILLIS = 100L private const val INNER_COROUTINE_DELAY_DURATION_MILLIS = 200L @@ -180,6 +183,21 @@ class RibCoroutineWorkerTest { } } + @Test + fun asWorker_autoDisposeWithCoroutineScope_lateEmissionIsReceivedBySubscriber() = runTest { + val router = mock>() + val interactor = object : Interactor>() {} + val relay = PublishSubject.create() + var gotEmission = false + val worker = + RibCoroutineWorker { relay.autoDispose(this).subscribe { gotEmission = true } }.asWorker() + InteractorHelper.attach(interactor, Any(), router, null) + WorkerBinder.bind(interactor, worker) + runCurrent() + relay.onNext(Unit) + assertThat(gotEmission).isTrue() + } + @Test fun testHelperFunction() = runTest { // Sanity - assert initial state.