Skip to content

Commit

Permalink
Fix concurrency mutex usage
Browse files Browse the repository at this point in the history
Replace the usage of 'mutex.withLock(this)' with 'mutex.withLock' in multiple files, preventing concurrency issues. This change affects the IndexedDBStore, RocksdbPersistentMap, JsonCollection implementations, and relevant test cases.
  • Loading branch information
fscarponi committed Sep 13, 2024
1 parent 02fd3fb commit 1f38ea4
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class JsonCollection(
findUsingIndex(selector, value)
?: iterateAll().filter { it.select(selector) == value }

public suspend fun removeById(id: Long): JsonObject? = mutex.withLock(this) { removeByIdUnsafe(id) }
public suspend fun removeById(id: Long): JsonObject? = mutex.withLock { removeByIdUnsafe(id) }

private suspend fun removeByIdUnsafe(id: Long): JsonObject? {
val jsonString = collection.remove(id) ?: return null
Expand All @@ -116,7 +116,7 @@ public class JsonCollection(
return jsonObject
}

public suspend fun insert(value: JsonObject): JsonObject = mutex.withLock(this) { insertUnsafe(value) }
public suspend fun insert(value: JsonObject): JsonObject = mutex.withLock { insertUnsafe(value) }

private suspend fun insertUnsafe(value: JsonObject): JsonObject {
val id = value.id ?: generateId()
Expand Down Expand Up @@ -149,7 +149,7 @@ public class JsonCollection(
?.toMap()

override suspend fun dropIndex(selector: String): Unit =
mutex.withLock(this) {
mutex.withLock {
store.deleteMap("$name.$selector")
indexMap.update(name, emptyList()) { it - selector }
}
Expand Down Expand Up @@ -188,7 +188,7 @@ public class JsonCollection(
fieldSelector: String,
fieldValue: JsonElement,
update: suspend (JsonObject) -> JsonObject,
): Boolean = mutex.withLock(this) { updateWhereUnsafe(fieldSelector, fieldValue, update) }
): Boolean = mutex.withLock { updateWhereUnsafe(fieldSelector, fieldValue, update) }

private suspend fun JsonCollection.updateWhereUnsafe(
fieldSelector: String,
Expand Down Expand Up @@ -218,7 +218,7 @@ public class JsonCollection(
upsert: Boolean,
update: JsonObject,
): Boolean =
mutex.withLock(this) {
mutex.withLock {
val updated = updateWhereUnsafe(fieldSelector, fieldValue) { update }
if (!updated && upsert) {
insertUnsafe(update)
Expand All @@ -231,7 +231,7 @@ public class JsonCollection(
fieldSelector: String,
fieldValue: JsonElement,
): Unit =
mutex.withLock(this) {
mutex.withLock {
val ids =
getIndexInternal(fieldSelector)
?.get(fieldValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
override suspend fun put(
key: String,
value: String,
) = mutex.withLock(this) { unsafePut(key, value) }
) = mutex.withLock { unsafePut(key, value) }

private suspend fun IndexedDBMap.unsafePut(
key: String,
Expand All @@ -58,7 +58,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
}

override suspend fun remove(key: String): String? =
mutex.withLock(this) {
mutex.withLock {
val previous = get(key)
keyval.del("${prefix}_$key").await()
previous
Expand All @@ -71,7 +71,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
value: String,
updater: (String) -> String,
): UpdateResult<String> =
mutex.withLock(this) {
mutex.withLock {
val oldValue = get(key)
val newValue = oldValue?.let(updater) ?: value
keyval.set("${prefix}_$key", newValue).await()
Expand All @@ -82,7 +82,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
key: String,
defaultValue: () -> String,
): String =
mutex.withLock(this) {
mutex.withLock {
get(key) ?: defaultValue().also { unsafePut(key, it) }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.document.database.rocksdb

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
Expand Down Expand Up @@ -30,7 +29,7 @@ public class RocksdbPersistentMap(
override suspend fun put(
key: String,
value: String,
): String? = mutex.withLock(this) { unsafePut(key, value) }
): String? = mutex.withLock { unsafePut(key, value) }

private suspend fun unsafePut(
key: String,
Expand All @@ -44,7 +43,7 @@ public class RocksdbPersistentMap(
}

override suspend fun remove(key: String): String? =
mutex.withLock(this) {
mutex.withLock {
withContext(Dispatchers.IO) {
val prefixed = key.prefixed()
val previous = delegate[prefixed]?.decodeToString()
Expand All @@ -58,7 +57,7 @@ public class RocksdbPersistentMap(
override suspend fun clear(): Unit = delegate.deletePrefix(prefix)

override suspend fun size(): Long =
mutex.withLock(this) {
mutex.withLock {
var count = 0L
withContext(Dispatchers.IO) {
delegate.newIterator().use {
Expand Down Expand Up @@ -107,7 +106,7 @@ public class RocksdbPersistentMap(
key: String,
defaultValue: () -> String,
): String =
mutex.withLock(this) {
mutex.withLock {
withContext(Dispatchers.IO) {
delegate[key.prefixed()]?.decodeToString() ?: defaultValue().also { unsafePut(key, it) }
}
Expand All @@ -118,7 +117,7 @@ public class RocksdbPersistentMap(
value: String,
updater: (String) -> String,
): UpdateResult<String> =
mutex.withLock(this) {
mutex.withLock {
withContext(Dispatchers.IO) {
val previous = delegate[key.prefixed()]?.decodeToString()
val newValue = previous?.let(updater) ?: value
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package kotlinx.document.database.tests

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.document.database.DataStore
import kotlinx.document.database.getObjectCollection
import kotlinx.document.database.tests.TestUser.Companion.Luigi
import kotlinx.document.database.tests.TestUser.Companion.Mario
import kotlinx.document.database.updateWhere
import kotlin.js.JsName
import kotlin.test.Test
import kotlin.test.assertFails
import kotlin.time.Duration.Companion.seconds

abstract class AbstractObjectCollectionTests(store: DataStore) : BaseTest(store) {
@Test
Expand Down Expand Up @@ -34,4 +39,36 @@ abstract class AbstractObjectCollectionTests(store: DataStore) : BaseTest(store)
val collection = db.getObjectCollection<List<TestUser>>("test")
assertFails { collection.insert(listOf(Mario, Luigi)) }
}

@Test
@JsName("Concurrent_modification")
fun `Concurrent modification`() =
runDatabaseTest {
val collection = db.getObjectCollection<TestUser>("test")
collection.insert(Mario)

val mutex = Mutex(true)

launch {
collection.updateWhere(
TestUser::name.name,
Mario.name,
) {
mutex.lock()
it
}
}

launch {
delay(2.seconds)
mutex.unlock()
}

collection.updateWhere(
TestUser::name.name,
Mario.name,
) {
it
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kotlinx.document.database.tests

import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest
import kotlinx.document.database.DataStore
import kotlinx.document.database.KotlinxDocumentDatabase
Expand All @@ -15,11 +16,11 @@ abstract class BaseTest(store: DataStore) : DatabaseDeleter {
protected fun runDatabaseTest(
context: CoroutineContext = EmptyCoroutineContext,
timeout: Duration = 60.seconds,
testBody: suspend TestScope.() -> Unit,
testBody: suspend CoroutineScope.() -> Unit,
) = runTest(context, timeout) {
deleteDatabase()
try {
testBody()
coroutineScope(testBody)
} finally {
db.close()
deleteDatabase()
Expand Down

0 comments on commit 1f38ea4

Please sign in to comment.