Skip to content

Commit

Permalink
Merge pull request #1 from lamba92/fabrizio.scarponi/fix-concurrency-…
Browse files Browse the repository at this point in the history
…mutex-usage

Fix concurrency mutex usage
  • Loading branch information
fscarponi authored Sep 13, 2024
2 parents 02fd3fb + 1f38ea4 commit 93923c2
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class JsonCollection(
findUsingIndex(selector, value)
?: iterateAll().filter { it.select(selector) == value }

public suspend fun removeById(id: Long): JsonObject? = mutex.withLock(this) { removeByIdUnsafe(id) }
public suspend fun removeById(id: Long): JsonObject? = mutex.withLock { removeByIdUnsafe(id) }

private suspend fun removeByIdUnsafe(id: Long): JsonObject? {
val jsonString = collection.remove(id) ?: return null
Expand All @@ -116,7 +116,7 @@ public class JsonCollection(
return jsonObject
}

public suspend fun insert(value: JsonObject): JsonObject = mutex.withLock(this) { insertUnsafe(value) }
public suspend fun insert(value: JsonObject): JsonObject = mutex.withLock { insertUnsafe(value) }

private suspend fun insertUnsafe(value: JsonObject): JsonObject {
val id = value.id ?: generateId()
Expand Down Expand Up @@ -149,7 +149,7 @@ public class JsonCollection(
?.toMap()

override suspend fun dropIndex(selector: String): Unit =
mutex.withLock(this) {
mutex.withLock {
store.deleteMap("$name.$selector")
indexMap.update(name, emptyList()) { it - selector }
}
Expand Down Expand Up @@ -188,7 +188,7 @@ public class JsonCollection(
fieldSelector: String,
fieldValue: JsonElement,
update: suspend (JsonObject) -> JsonObject,
): Boolean = mutex.withLock(this) { updateWhereUnsafe(fieldSelector, fieldValue, update) }
): Boolean = mutex.withLock { updateWhereUnsafe(fieldSelector, fieldValue, update) }

private suspend fun JsonCollection.updateWhereUnsafe(
fieldSelector: String,
Expand Down Expand Up @@ -218,7 +218,7 @@ public class JsonCollection(
upsert: Boolean,
update: JsonObject,
): Boolean =
mutex.withLock(this) {
mutex.withLock {
val updated = updateWhereUnsafe(fieldSelector, fieldValue) { update }
if (!updated && upsert) {
insertUnsafe(update)
Expand All @@ -231,7 +231,7 @@ public class JsonCollection(
fieldSelector: String,
fieldValue: JsonElement,
): Unit =
mutex.withLock(this) {
mutex.withLock {
val ids =
getIndexInternal(fieldSelector)
?.get(fieldValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
override suspend fun put(
key: String,
value: String,
) = mutex.withLock(this) { unsafePut(key, value) }
) = mutex.withLock { unsafePut(key, value) }

private suspend fun IndexedDBMap.unsafePut(
key: String,
Expand All @@ -58,7 +58,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
}

override suspend fun remove(key: String): String? =
mutex.withLock(this) {
mutex.withLock {
val previous = get(key)
keyval.del("${prefix}_$key").await()
previous
Expand All @@ -71,7 +71,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
value: String,
updater: (String) -> String,
): UpdateResult<String> =
mutex.withLock(this) {
mutex.withLock {
val oldValue = get(key)
val newValue = oldValue?.let(updater) ?: value
keyval.set("${prefix}_$key", newValue).await()
Expand All @@ -82,7 +82,7 @@ class IndexedDBMap(private val prefix: String) : PersistentMap<String, String> {
key: String,
defaultValue: () -> String,
): String =
mutex.withLock(this) {
mutex.withLock {
get(key) ?: defaultValue().also { unsafePut(key, it) }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.document.database.rocksdb

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
Expand Down Expand Up @@ -30,7 +29,7 @@ public class RocksdbPersistentMap(
override suspend fun put(
key: String,
value: String,
): String? = mutex.withLock(this) { unsafePut(key, value) }
): String? = mutex.withLock { unsafePut(key, value) }

private suspend fun unsafePut(
key: String,
Expand All @@ -44,7 +43,7 @@ public class RocksdbPersistentMap(
}

override suspend fun remove(key: String): String? =
mutex.withLock(this) {
mutex.withLock {
withContext(Dispatchers.IO) {
val prefixed = key.prefixed()
val previous = delegate[prefixed]?.decodeToString()
Expand All @@ -58,7 +57,7 @@ public class RocksdbPersistentMap(
override suspend fun clear(): Unit = delegate.deletePrefix(prefix)

override suspend fun size(): Long =
mutex.withLock(this) {
mutex.withLock {
var count = 0L
withContext(Dispatchers.IO) {
delegate.newIterator().use {
Expand Down Expand Up @@ -107,7 +106,7 @@ public class RocksdbPersistentMap(
key: String,
defaultValue: () -> String,
): String =
mutex.withLock(this) {
mutex.withLock {
withContext(Dispatchers.IO) {
delegate[key.prefixed()]?.decodeToString() ?: defaultValue().also { unsafePut(key, it) }
}
Expand All @@ -118,7 +117,7 @@ public class RocksdbPersistentMap(
value: String,
updater: (String) -> String,
): UpdateResult<String> =
mutex.withLock(this) {
mutex.withLock {
withContext(Dispatchers.IO) {
val previous = delegate[key.prefixed()]?.decodeToString()
val newValue = previous?.let(updater) ?: value
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package kotlinx.document.database.tests

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.document.database.DataStore
import kotlinx.document.database.getObjectCollection
import kotlinx.document.database.tests.TestUser.Companion.Luigi
import kotlinx.document.database.tests.TestUser.Companion.Mario
import kotlinx.document.database.updateWhere
import kotlin.js.JsName
import kotlin.test.Test
import kotlin.test.assertFails
import kotlin.time.Duration.Companion.seconds

abstract class AbstractObjectCollectionTests(store: DataStore) : BaseTest(store) {
@Test
Expand Down Expand Up @@ -34,4 +39,36 @@ abstract class AbstractObjectCollectionTests(store: DataStore) : BaseTest(store)
val collection = db.getObjectCollection<List<TestUser>>("test")
assertFails { collection.insert(listOf(Mario, Luigi)) }
}

@Test
@JsName("Concurrent_modification")
fun `Concurrent modification`() =
runDatabaseTest {
val collection = db.getObjectCollection<TestUser>("test")
collection.insert(Mario)

val mutex = Mutex(true)

launch {
collection.updateWhere(
TestUser::name.name,
Mario.name,
) {
mutex.lock()
it
}
}

launch {
delay(2.seconds)
mutex.unlock()
}

collection.updateWhere(
TestUser::name.name,
Mario.name,
) {
it
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kotlinx.document.database.tests

import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest
import kotlinx.document.database.DataStore
import kotlinx.document.database.KotlinxDocumentDatabase
Expand All @@ -15,11 +16,11 @@ abstract class BaseTest(store: DataStore) : DatabaseDeleter {
protected fun runDatabaseTest(
context: CoroutineContext = EmptyCoroutineContext,
timeout: Duration = 60.seconds,
testBody: suspend TestScope.() -> Unit,
testBody: suspend CoroutineScope.() -> Unit,
) = runTest(context, timeout) {
deleteDatabase()
try {
testBody()
coroutineScope(testBody)
} finally {
db.close()
deleteDatabase()
Expand Down

0 comments on commit 93923c2

Please sign in to comment.