Skip to content

Commit

Permalink
PM-11753: Vault flows should listen to vault onlock state to ensure d… (
Browse files Browse the repository at this point in the history
  • Loading branch information
david-livefront authored Nov 6, 2024
1 parent 816b976 commit ec85e7a
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.x8bit.bitwarden.data.platform.repository.util

import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson
import com.x8bit.bitwarden.data.vault.repository.model.VaultUnlockData
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
Expand All @@ -31,3 +33,34 @@ fun <T, R> MutableStateFlow<T>.observeWhenSubscribedAndLoggedIn(
.flatMapLatest { activeUserId ->
activeUserId?.let(observer) ?: flow { awaitCancellation() }
}

/**
* Invokes the [observer] callback whenever the user is logged in, the active user changes, the
* vault for the user changes, and there are subscribers to the [MutableStateFlow]. The flow from
* all previous calls to the `observer` is canceled whenever the `observer` is re-invoked, there
* is no active user (logged-out), there are no subscribers to the [MutableStateFlow], or the vault
* is not unlocked.
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun <T, R> MutableStateFlow<T>.observeWhenSubscribedAndUnlocked(
userStateFlow: Flow<UserStateJson?>,
vaultUnlockFlow: Flow<List<VaultUnlockData>>,
observer: (activeUserId: String) -> Flow<R>,
): Flow<R> =
combine(
this.subscriptionCount.map { it > 0 }.distinctUntilChanged(),
userStateFlow.map { it?.activeUserId }.distinctUntilChanged(),
userStateFlow.map { it?.activeUserId }
.distinctUntilChanged()
.filterNotNull()
.flatMapLatest { activeUserId ->
vaultUnlockFlow
.map { it.any { it.userId == activeUserId } }
.distinctUntilChanged()
},
) { isSubscribed, activeUserId, isUnlocked ->
activeUserId.takeIf { isSubscribed && isUnlocked }
}
.flatMapLatest { activeUserId ->
activeUserId?.let(observer) ?: flow { awaitCancellation() }
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.x8bit.bitwarden.data.platform.repository.util.combineDataStates
import com.x8bit.bitwarden.data.platform.repository.util.map
import com.x8bit.bitwarden.data.platform.repository.util.mapNullable
import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndLoggedIn
import com.x8bit.bitwarden.data.platform.repository.util.observeWhenSubscribedAndUnlocked
import com.x8bit.bitwarden.data.platform.repository.util.updateToPendingOrLoading
import com.x8bit.bitwarden.data.platform.util.asFailure
import com.x8bit.bitwarden.data.platform.util.asSuccess
Expand Down Expand Up @@ -222,7 +223,13 @@ class VaultRepositoryImpl(
// Cancel any ongoing sync request and clear the vault data in memory every time
// the user switches or the vault is locked for the active user.
merge(
authDiskSource.userSwitchingChangesFlow,
authDiskSource
.userSwitchingChangesFlow
.onEach {
// DomainState is not part of the locked data but should still be cleared
// when the user changes
mutableDomainsStateFlow.update { DataState.Loading }
},
vaultLockManager
.vaultUnlockDataStateFlow
.filter { vaultUnlockDataList ->
Expand All @@ -238,7 +245,10 @@ class VaultRepositoryImpl(

// Setup ciphers MutableStateFlow
mutableCiphersStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId ->
.observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskCiphers(activeUserId)
}
.launchIn(unconfinedScope)
Expand All @@ -250,19 +260,28 @@ class VaultRepositoryImpl(
.launchIn(unconfinedScope)
// Setup folders MutableStateFlow
mutableFoldersStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId ->
.observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskFolders(activeUserId)
}
.launchIn(unconfinedScope)
// Setup collections MutableStateFlow
mutableCollectionsStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId ->
.observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskCollections(activeUserId)
}
.launchIn(unconfinedScope)
// Setup sends MutableStateFlow
mutableSendDataStateFlow
.observeWhenSubscribedAndLoggedIn(authDiskSource.userStateFlow) { activeUserId ->
.observeWhenSubscribedAndUnlocked(
userStateFlow = authDiskSource.userStateFlow,
vaultUnlockFlow = vaultUnlockDataStateFlow,
) { activeUserId ->
observeVaultDiskSends(activeUserId)
}
.launchIn(unconfinedScope)
Expand Down Expand Up @@ -305,7 +324,6 @@ class VaultRepositoryImpl(

private fun clearUnlockedData() {
mutableCiphersStateFlow.update { DataState.Loading }
mutableDomainsStateFlow.update { DataState.Loading }
mutableFoldersStateFlow.update { DataState.Loading }
mutableCollectionsStateFlow.update { DataState.Loading }
mutableSendDataStateFlow.update { DataState.Loading }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.x8bit.bitwarden.data.platform.repository.util

import app.cash.turbine.test
import com.x8bit.bitwarden.data.auth.datasource.disk.model.UserStateJson
import com.x8bit.bitwarden.data.vault.repository.model.VaultUnlockData
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.flow.MutableStateFlow
Expand Down Expand Up @@ -50,6 +51,74 @@ class StateFlowExtensionsTest {
assertEquals(0, awaitItem())
assertEquals(1, awaitItem())

job.cancel()
// Job is canceled, we should have no more subscribers
assertEquals(0, awaitItem())
}
}

@Suppress("MaxLineLength")
@Test
fun `observeWhenSubscribedAndUnlocked should observe the given flow depending on the state of the source user and vault unlock flow`() =
runTest {
val userStateFlow = MutableStateFlow<UserStateJson?>(null)
val vaultUnlockFlow = MutableStateFlow<List<VaultUnlockData>>(emptyList())
val observerStateFlow = MutableStateFlow("")
val sourceMutableStateFlow = MutableStateFlow(Unit)

assertEquals(0, observerStateFlow.subscriptionCount.value)
sourceMutableStateFlow
.observeWhenSubscribedAndUnlocked(
userStateFlow = userStateFlow,
vaultUnlockFlow = vaultUnlockFlow,
observer = { observerStateFlow },
)
.launchIn(backgroundScope)

observerStateFlow.subscriptionCount.test {
// No subscriber to start
assertEquals(0, awaitItem())

userStateFlow.value = mockk<UserStateJson> {
every { activeUserId } returns "user_id_1234"
}
// Still none, since no one has subscribed to the testMutableStateFlow
expectNoEvents()

vaultUnlockFlow.value = listOf(
VaultUnlockData(
userId = "user_id_1234",
status = VaultUnlockData.Status.UNLOCKED,
),
)

// Still none, since no one has subscribed to the testMutableStateFlow
expectNoEvents()

val job = sourceMutableStateFlow.launchIn(backgroundScope)
// Now we subscribe to the observer flow since have a active user and a listener
assertEquals(1, awaitItem())

userStateFlow.value = mockk<UserStateJson> {
every { activeUserId } returns "user_id_4321"
}
// The user changed, so we clear the previous observer but then resubscribe
// with the new user ID
assertEquals(0, awaitItem())
assertEquals(1, awaitItem())

vaultUnlockFlow.value = listOf(
VaultUnlockData(
userId = "user_id_4321",
status = VaultUnlockData.Status.UNLOCKED,
),
)

// The VaultUnlockData changed, so we clear the previous observer but then resubscribe
// with the new data
assertEquals(0, awaitItem())
assertEquals(1, awaitItem())

job.cancel()
// Job is canceled, we should have no more subscribers
assertEquals(0, awaitItem())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ class VaultRepositoryTest {
)
setVaultToUnlocked(userId = userId)

ciphersFlow.tryEmit(listOf(createMockCipher(number = 1)))
collectionsFlow.tryEmit(listOf(createMockCollection(number = 1)))
foldersFlow.tryEmit(listOf(createMockFolder(number = 1)))
sendsFlow.tryEmit(listOf(createMockSend(number = 1)))
domainsFlow.tryEmit(createMockDomains(number = 1))

assertEquals(
DataState.Loaded(listOf(createMockCipherView(number = 1))),
ciphersStateFlow.awaitItem(),
Expand Down Expand Up @@ -484,7 +490,8 @@ class VaultRepositoryTest {
assertEquals(DataState.Loading, collectionsStateFlow.awaitItem())
assertEquals(DataState.Loading, foldersStateFlow.awaitItem())
assertEquals(DataState.Loading, sendsStateFlow.awaitItem())
assertEquals(DataState.Loading, domainsStateFlow.awaitItem())
// We already have the domain data
domainsStateFlow.expectNoEvents()
}
}

Expand Down Expand Up @@ -1804,6 +1811,9 @@ class VaultRepositoryTest {
settingsDiskSource.getLastSyncTime(userId = userId)
} returns clock.instant()

mutableVaultStateFlow.update {
listOf(VaultUnlockData(userId, VaultUnlockData.Status.UNLOCKED))
}
fakeAuthDiskSource.userState = MOCK_USER_STATE
setupEmptyDecryptionResults()
setupVaultDiskSourceFlows(
Expand Down Expand Up @@ -1960,6 +1970,7 @@ class VaultRepositoryTest {
expectNoEvents()
setVaultToUnlocked(userId = MOCK_USER_STATE.activeUserId)

sendsFlow.tryEmit(emptyList())
assertEquals(DataState.Loaded<SendView?>(null), awaitItem())
sendsFlow.tryEmit(listOf(createMockSend(number = sendId)))
assertEquals(DataState.Loaded<SendView?>(sendView), awaitItem())
Expand Down Expand Up @@ -4544,6 +4555,14 @@ class VaultRepositoryTest {
*/
private fun setVaultToUnlocked(userId: String) {
mutableUnlockedUserIdsStateFlow.update { it + userId }
mutableVaultStateFlow.tryEmit(
listOf(
VaultUnlockData(
userId,
VaultUnlockData.Status.UNLOCKED,
),
),
)
}

/**
Expand Down

0 comments on commit ec85e7a

Please sign in to comment.