From 1f38ea4fcbeaffe291e214dc49b0b9a0eaa0398d Mon Sep 17 00:00:00 2001 From: fscarponi Date: Fri, 13 Sep 2024 15:27:55 +0200 Subject: [PATCH] Fix concurrency mutex usage Replace the usage of 'mutex.withLock(this)' with 'mutex.withLock' in multiple files, preventing concurrency issues. This change affects the IndexedDBStore, RocksdbPersistentMap, JsonCollection implementations, and relevant test cases. --- .../document/database/JsonCollection.kt | 12 +++--- .../database/browser/IndexedDBStore.kt | 8 ++-- .../database/rocksdb/RocksdbPersistentMap.kt | 11 +++--- .../tests/AbstractObjectCollectionTests.kt | 37 +++++++++++++++++++ .../document/database/tests/BaseTest.kt | 7 ++-- 5 files changed, 56 insertions(+), 19 deletions(-) diff --git a/core/src/commonMain/kotlin/kotlinx/document/database/JsonCollection.kt b/core/src/commonMain/kotlin/kotlinx/document/database/JsonCollection.kt index da53705..95be53e 100644 --- a/core/src/commonMain/kotlin/kotlinx/document/database/JsonCollection.kt +++ b/core/src/commonMain/kotlin/kotlinx/document/database/JsonCollection.kt @@ -97,7 +97,7 @@ public class JsonCollection( findUsingIndex(selector, value) ?: iterateAll().filter { it.select(selector) == value } - public suspend fun removeById(id: Long): JsonObject? = mutex.withLock(this) { removeByIdUnsafe(id) } + public suspend fun removeById(id: Long): JsonObject? = mutex.withLock { removeByIdUnsafe(id) } private suspend fun removeByIdUnsafe(id: Long): JsonObject? { val jsonString = collection.remove(id) ?: return null @@ -116,7 +116,7 @@ public class JsonCollection( return jsonObject } - public suspend fun insert(value: JsonObject): JsonObject = mutex.withLock(this) { insertUnsafe(value) } + public suspend fun insert(value: JsonObject): JsonObject = mutex.withLock { insertUnsafe(value) } private suspend fun insertUnsafe(value: JsonObject): JsonObject { val id = value.id ?: generateId() @@ -149,7 +149,7 @@ public class JsonCollection( ?.toMap() override suspend fun dropIndex(selector: String): Unit = - mutex.withLock(this) { + mutex.withLock { store.deleteMap("$name.$selector") indexMap.update(name, emptyList()) { it - selector } } @@ -188,7 +188,7 @@ public class JsonCollection( fieldSelector: String, fieldValue: JsonElement, update: suspend (JsonObject) -> JsonObject, - ): Boolean = mutex.withLock(this) { updateWhereUnsafe(fieldSelector, fieldValue, update) } + ): Boolean = mutex.withLock { updateWhereUnsafe(fieldSelector, fieldValue, update) } private suspend fun JsonCollection.updateWhereUnsafe( fieldSelector: String, @@ -218,7 +218,7 @@ public class JsonCollection( upsert: Boolean, update: JsonObject, ): Boolean = - mutex.withLock(this) { + mutex.withLock { val updated = updateWhereUnsafe(fieldSelector, fieldValue) { update } if (!updated && upsert) { insertUnsafe(update) @@ -231,7 +231,7 @@ public class JsonCollection( fieldSelector: String, fieldValue: JsonElement, ): Unit = - mutex.withLock(this) { + mutex.withLock { val ids = getIndexInternal(fieldSelector) ?.get(fieldValue) diff --git a/stores/browser/src/jsMain/kotlin/kotlinx/document/database/browser/IndexedDBStore.kt b/stores/browser/src/jsMain/kotlin/kotlinx/document/database/browser/IndexedDBStore.kt index 7a6e615..18f1767 100644 --- a/stores/browser/src/jsMain/kotlin/kotlinx/document/database/browser/IndexedDBStore.kt +++ b/stores/browser/src/jsMain/kotlin/kotlinx/document/database/browser/IndexedDBStore.kt @@ -46,7 +46,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap { override suspend fun put( key: String, value: String, - ) = mutex.withLock(this) { unsafePut(key, value) } + ) = mutex.withLock { unsafePut(key, value) } private suspend fun IndexedDBMap.unsafePut( key: String, @@ -58,7 +58,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap { } override suspend fun remove(key: String): String? = - mutex.withLock(this) { + mutex.withLock { val previous = get(key) keyval.del("${prefix}_$key").await() previous @@ -71,7 +71,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap { value: String, updater: (String) -> String, ): UpdateResult = - mutex.withLock(this) { + mutex.withLock { val oldValue = get(key) val newValue = oldValue?.let(updater) ?: value keyval.set("${prefix}_$key", newValue).await() @@ -82,7 +82,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap { key: String, defaultValue: () -> String, ): String = - mutex.withLock(this) { + mutex.withLock { get(key) ?: defaultValue().also { unsafePut(key, it) } } diff --git a/stores/rocksdb/src/commonMain/kotlin/kotlinx/document/database/rocksdb/RocksdbPersistentMap.kt b/stores/rocksdb/src/commonMain/kotlin/kotlinx/document/database/rocksdb/RocksdbPersistentMap.kt index c15f80c..fe32a64 100644 --- a/stores/rocksdb/src/commonMain/kotlin/kotlinx/document/database/rocksdb/RocksdbPersistentMap.kt +++ b/stores/rocksdb/src/commonMain/kotlin/kotlinx/document/database/rocksdb/RocksdbPersistentMap.kt @@ -1,7 +1,6 @@ package kotlinx.document.database.rocksdb import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn @@ -30,7 +29,7 @@ public class RocksdbPersistentMap( override suspend fun put( key: String, value: String, - ): String? = mutex.withLock(this) { unsafePut(key, value) } + ): String? = mutex.withLock { unsafePut(key, value) } private suspend fun unsafePut( key: String, @@ -44,7 +43,7 @@ public class RocksdbPersistentMap( } override suspend fun remove(key: String): String? = - mutex.withLock(this) { + mutex.withLock { withContext(Dispatchers.IO) { val prefixed = key.prefixed() val previous = delegate[prefixed]?.decodeToString() @@ -58,7 +57,7 @@ public class RocksdbPersistentMap( override suspend fun clear(): Unit = delegate.deletePrefix(prefix) override suspend fun size(): Long = - mutex.withLock(this) { + mutex.withLock { var count = 0L withContext(Dispatchers.IO) { delegate.newIterator().use { @@ -107,7 +106,7 @@ public class RocksdbPersistentMap( key: String, defaultValue: () -> String, ): String = - mutex.withLock(this) { + mutex.withLock { withContext(Dispatchers.IO) { delegate[key.prefixed()]?.decodeToString() ?: defaultValue().also { unsafePut(key, it) } } @@ -118,7 +117,7 @@ public class RocksdbPersistentMap( value: String, updater: (String) -> String, ): UpdateResult = - mutex.withLock(this) { + mutex.withLock { withContext(Dispatchers.IO) { val previous = delegate[key.prefixed()]?.decodeToString() val newValue = previous?.let(updater) ?: value diff --git a/tests/src/commonMain/kotlin/kotlinx/document/database/tests/AbstractObjectCollectionTests.kt b/tests/src/commonMain/kotlin/kotlinx/document/database/tests/AbstractObjectCollectionTests.kt index 7d014bd..ccc35ef 100644 --- a/tests/src/commonMain/kotlin/kotlinx/document/database/tests/AbstractObjectCollectionTests.kt +++ b/tests/src/commonMain/kotlin/kotlinx/document/database/tests/AbstractObjectCollectionTests.kt @@ -1,12 +1,17 @@ package kotlinx.document.database.tests +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.document.database.DataStore import kotlinx.document.database.getObjectCollection import kotlinx.document.database.tests.TestUser.Companion.Luigi import kotlinx.document.database.tests.TestUser.Companion.Mario +import kotlinx.document.database.updateWhere import kotlin.js.JsName import kotlin.test.Test import kotlin.test.assertFails +import kotlin.time.Duration.Companion.seconds abstract class AbstractObjectCollectionTests(store: DataStore) : BaseTest(store) { @Test @@ -34,4 +39,36 @@ abstract class AbstractObjectCollectionTests(store: DataStore) : BaseTest(store) val collection = db.getObjectCollection>("test") assertFails { collection.insert(listOf(Mario, Luigi)) } } + + @Test + @JsName("Concurrent_modification") + fun `Concurrent modification`() = + runDatabaseTest { + val collection = db.getObjectCollection("test") + collection.insert(Mario) + + val mutex = Mutex(true) + + launch { + collection.updateWhere( + TestUser::name.name, + Mario.name, + ) { + mutex.lock() + it + } + } + + launch { + delay(2.seconds) + mutex.unlock() + } + + collection.updateWhere( + TestUser::name.name, + Mario.name, + ) { + it + } + } } diff --git a/tests/src/commonMain/kotlin/kotlinx/document/database/tests/BaseTest.kt b/tests/src/commonMain/kotlin/kotlinx/document/database/tests/BaseTest.kt index 4ae0a7c..02d45fd 100644 --- a/tests/src/commonMain/kotlin/kotlinx/document/database/tests/BaseTest.kt +++ b/tests/src/commonMain/kotlin/kotlinx/document/database/tests/BaseTest.kt @@ -1,6 +1,7 @@ package kotlinx.document.database.tests -import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.test.runTest import kotlinx.document.database.DataStore import kotlinx.document.database.KotlinxDocumentDatabase @@ -15,11 +16,11 @@ abstract class BaseTest(store: DataStore) : DatabaseDeleter { protected fun runDatabaseTest( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration = 60.seconds, - testBody: suspend TestScope.() -> Unit, + testBody: suspend CoroutineScope.() -> Unit, ) = runTest(context, timeout) { deleteDatabase() try { - testBody() + coroutineScope(testBody) } finally { db.close() deleteDatabase()