diff --git a/pom.xml b/pom.xml index e12c402..55f4c1b 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,6 @@ 10.17.3 1.7.2 2.17.2 - 1.20240905.231604 @@ -166,12 +165,6 @@ ${jackson.version} test - - no.liflig - snapshot-test - ${liflig-snapshot-test.version} - test - diff --git a/src/main/kotlin/no/liflig/documentstore/repository/Repository.kt b/src/main/kotlin/no/liflig/documentstore/repository/Repository.kt index 7514c24..5b898e8 100644 --- a/src/main/kotlin/no/liflig/documentstore/repository/Repository.kt +++ b/src/main/kotlin/no/liflig/documentstore/repository/Repository.kt @@ -1,16 +1,9 @@ -@file:Suppress("MemberVisibilityCanBePrivate") // This is a library, we want to expose fields - package no.liflig.documentstore.repository -import java.time.Instant import no.liflig.documentstore.entity.Entity import no.liflig.documentstore.entity.EntityId import no.liflig.documentstore.entity.Version import no.liflig.documentstore.entity.Versioned -import no.liflig.documentstore.entity.getEntityIdType -import org.jdbi.v3.core.Jdbi -import org.jdbi.v3.core.mapper.RowMapper -import org.jdbi.v3.core.statement.Query /** Interface for interacting with entities in a database table. */ interface Repository> { @@ -52,334 +45,86 @@ interface Repository> { fun listByIds(ids: List): List> fun listAll(): List> -} - -/** - * An implementation of [Repository] that uses the [JDBI library](https://jdbi.org/) for database - * access. - * - * Provides default implementations for CRUD (Create, Read, Update, Delete), as well as `listByIds` - * and `listAll`. To implement filtering on specific fields on your entities, you can extend this - * and use the [getByPredicate] utility method to provide your own WHERE clause (or - * [getByPredicateWithTotalCount] for better pagination support). - * - * The implementation assumes that the table has the following columns: - * ```sql - * CREATE TABLE example - * ( - * -- Can have type `text` if using `StringEntityId` - * id uuid NOT NULL PRIMARY KEY, - * created_at timestamptz NOT NULL, - * modified_at timestamptz NOT NULL, - * version bigint NOT NULL, - * data jsonb NOT NULL - * ); - * ``` - * - * Also, the given [Jdbi] instance must have the - * [DocumentStorePlugin][no.liflig.documentstore.DocumentStorePlugin] installed for the queries in - * this class to work. - */ -open class RepositoryJdbi>( - protected val jdbi: Jdbi, - protected val tableName: String, - protected val serializationAdapter: SerializationAdapter, -) : Repository { - protected val rowMapper: RowMapper> = - createRowMapper(serializationAdapter::fromJson) - - private val rowMapperWithTotalCount = - createRowMapperWithTotalCount(serializationAdapter::fromJson) - - private val updateResultMapper = createUpdateResultMapper() - - override fun create(entity: EntityT): Versioned { - try { - useHandle(jdbi) { handle -> - val now = Instant.now() - val version = Version.initial() - handle - .createUpdate( - """ - INSERT INTO "${tableName}" (id, data, version, created_at, modified_at) - VALUES (:id, :data::jsonb, :version, :createdAt, :modifiedAt) - """ - .trimIndent(), - ) - .bind("id", entity.id) - .bind("data", serializationAdapter.toJson(entity)) - .bind("version", version) - .bind("createdAt", now) - .bind("modifiedAt", now) - .execute() - return Versioned(entity, version, createdAt = now, modifiedAt = now) - } - } catch (e: Exception) { - // Call mapDatabaseException first to handle connection-related exceptions, before calling - // mapCreateOrUpdateException (which may be overridden by users for custom error handling). - throw mapCreateOrUpdateException(mapDatabaseException(e), entity) - } - } - - override fun get(id: EntityIdT, forUpdate: Boolean): Versioned? { - useHandle(jdbi) { handle -> - return handle - .createQuery( - """ - SELECT id, data, version, created_at, modified_at - FROM "${tableName}" - WHERE id = :id - ORDER BY created_at - ${if (forUpdate) " FOR UPDATE" else ""} - """ - .trimIndent(), - ) - .bind("id", id) - .map(rowMapper) - .firstOrNull() - } - } - - override fun update( - entity: EntityOrSubClassT, - previousVersion: Version - ): Versioned { - try { - useHandle(jdbi) { handle -> - val nextVersion = previousVersion.next() - val modifiedAt = Instant.now() - val updateResult = - handle - .createQuery( - /** See [UpdateResult] for why we use RETURNING here. */ - """ - UPDATE "${tableName}" - SET - version = :nextVersion, - data = :data::jsonb, - modified_at = :modifiedAt - WHERE - id = :id AND - version = :previousVersion - RETURNING - created_at - """ - .trimIndent(), - ) - .bind("nextVersion", nextVersion) - .bind("data", serializationAdapter.toJson(entity)) - .bind("id", entity.id) - .bind("modifiedAt", modifiedAt) - .bind("previousVersion", previousVersion) - .map(updateResultMapper) - .firstOrNull() - - if (updateResult == null) { - throw ConflictRepositoryException( - "Entity with ID '${entity.id}' was concurrently modified between being retrieved and trying to update it here", - ) - } - - return Versioned( - entity, - nextVersion, - createdAt = updateResult.created_at, - modifiedAt = modifiedAt, - ) - } - } catch (e: Exception) { - // Call mapDatabaseException first to handle connection-related exceptions, before calling - // mapCreateOrUpdateException (which may be overridden by users for custom error handling). - throw mapCreateOrUpdateException(mapDatabaseException(e), entity) - } - } - - override fun delete(id: EntityIdT, previousVersion: Version) { - useHandle(jdbi) { handle -> - val deleted = - handle - .createUpdate( - """ - DELETE FROM "${tableName}" - WHERE id = :id AND version = :previousVersion - """ - .trimIndent(), - ) - .bind("id", id) - .bind("previousVersion", previousVersion) - .execute() - if (deleted == 0) { - throw ConflictRepositoryException( - "Entity with ID '${id}' was concurrently modified between being retrieved and trying to delete it here", - ) - } - } - } - - override fun listByIds(ids: List): List> { - if (ids.isEmpty()) { - return emptyList() - } - - /** See [getEntityIdType]. */ - val elementType = getEntityIdType(ids.first()) - return getByPredicate("id = ANY (:ids)") { bindArray("ids", elementType, ids) } - } - - override fun listAll(): List> { - return getByPredicate() // Defaults to all + /** + * Stores the given list of entities in the database. + * + * The implementation in [RepositoryJdbi.batchCreate] uses + * [Prepared Batches from JDBI](https://jdbi.org/releases/3.45.1/#_prepared_batches) to make the + * implementation as efficient as possible. + */ + fun batchCreate(entities: List): List> { + // A default implementation is provided here on the interface, so that implementors don't have + // to implement this themselves (for e.g. mock repositories). + return entities.map { entity -> create(entity) } } /** - * Runs a SELECT query using the given WHERE clause, limit, offset etc. - * - * When using parameters in [sqlWhere], you must remember to bind them through the [bind] function - * argument (do not concatenate user input directly in [sqlWhere], as that exposes you to SQL - * injections). When using a list parameter, use the [Query.bindArray] method and pass the class - * of the list's elements as the second argument (required for JDBI's reflection to work - see - * example below). + * Takes a list of entities along with their previous [Version], to implement optimistic locking + * like [update]. Returns the updated entities along with their new version. * - * [sqlWhere] will typically use Postgres JSON operators to filter on entity fields, since the - * document store uses `jsonb` to store entities (see - * [Postgres docs](https://www.postgresql.org/docs/16/functions-json.html)). + * The implementation in [RepositoryJdbi.batchUpdate] uses + * [Prepared Batches from JDBI](https://jdbi.org/releases/3.45.1/#_prepared_batches) to make the + * implementation as efficient as possible. * - * Example implementing a query where we want to look up users from a list of names: - * ``` - * fun getByNames(names: List): List> { - * return getByPredicate("data->>'name' = ANY(:names)") { - * bindArray("names", String::class.java, names) - * } - * } - * ``` + * @throws ConflictRepositoryException If the version on any of the given entities does not match + * the current version of the entity in the database. */ - protected open fun getByPredicate( - sqlWhere: String = "TRUE", - limit: Int? = null, - offset: Int? = null, - orderBy: String? = null, - orderDesc: Boolean = false, - forUpdate: Boolean = false, - bind: Query.() -> Query = { this } - ): List> { - useHandle(jdbi) { handle -> - val orderDirection = if (orderDesc) "DESC" else "ASC" - val orderByString = orderBy ?: "created_at" - - val limitString = if (limit != null) "LIMIT ${limit}" else "" - val offsetString = if (offset != null) "OFFSET ${offset}" else "" - val forUpdateString = if (forUpdate) " FOR UPDATE" else "" - - return handle - .createQuery( - """ - SELECT id, data, version, created_at, modified_at - FROM "${tableName}" - WHERE (${sqlWhere}) - ORDER BY ${orderByString} ${orderDirection} - ${limitString} - ${offsetString} - ${forUpdateString} - """ - .trimIndent(), - ) - .bind() - .map(rowMapper) - .list() - } + fun batchUpdate(entities: List>): List> { + // A default implementation is provided here on the interface, so that implementors don't have + // to implement this themselves (for e.g. mock repositories). + return entities.map { entity -> update(entity.item, entity.version) } } /** - * Gets database objects matching the given parameters, and the total count of objects matching - * the WHERE clause without `limit`. + * Takes a list of entities along with their previous [Version], to implement optimistic locking + * like [delete]. * - * This can be used for pagination: for example, if passing e.g. `limit = 10` to display 10 items - * in a page at a time, the total count can be used to display the number of pages. + * The implementation in [RepositoryJdbi.batchDelete] uses + * [Prepared Batches from JDBI](https://jdbi.org/releases/3.45.1/#_prepared_batches) to make the + * implementation as efficient as possible. * - * See [getByPredicate] for further documentation. + * @throws ConflictRepositoryException If the version on any of the given entities does not match + * the current version of the entity in the database. */ - protected open fun getByPredicateWithTotalCount( - sqlWhere: String = "TRUE", - limit: Int? = null, - offset: Int? = null, - orderBy: String? = null, - orderDesc: Boolean = false, - bind: Query.() -> Query = { this } - ): ListWithTotalCount> { - useHandle(jdbi) { handle -> - val limitString = limit?.let { "LIMIT $it" } ?: "" - val offsetString = offset?.let { "OFFSET $it" } ?: "" - val orderDirection = if (orderDesc) "DESC" else "ASC" - val orderByString = orderBy ?: "created_at" - - val rows = - handle - .createQuery( - // SQL query based on https://stackoverflow.com/a/28888696 - // Uses a RIGHT JOIN with the count in order to still get the count when no rows - // are returned. - """ - WITH base_query AS ( - SELECT id, data, version, created_at, modified_at - FROM "${tableName}" - WHERE (${sqlWhere}) - ) - SELECT id, data, version, created_at, modified_at, count - FROM ( - TABLE base_query - ORDER BY ${orderByString} ${orderDirection} - ${limitString} - ${offsetString} - ) sub_query - RIGHT JOIN ( - SELECT count(*) FROM base_query - ) c(count) ON true - """ - .trimIndent(), - ) - .bind() - .map(rowMapperWithTotalCount) - .list() - - val entities = rows.mapNotNull { row -> row.entity } - val count = - rows.firstOrNull()?.count - /** - * Should never happen: the query should always return 1 row with the count, even if the - * results are empty (see [EntityRowWithTotalCount]). - */ - ?: throw IllegalStateException("Failed to get total count of objects in search query") - - return ListWithTotalCount(entities, count) + fun batchDelete(entities: List>) { + // A default implementation is provided here on the interface, so that implementors don't have + // to implement this themselves (for e.g. mock repositories). + for (entity in entities) { + delete(entity.item.id, entity.version) } } /** - * Method that you can override to map exceptions thrown in [create] or [update] to your own - * exception type. This is useful to handle e.g. unique constraint violations: instead of letting - * the database throw an opaque `PSQLException` that may be difficult to handle in layers above, - * you can instead check if the given exception is a unique index violation and map it to a more - * useful exception type here. + * When using a document store, the application must take care to stay backwards-compatible with + * entities that are stored in the database. Thus, new fields added to entitites must always have + * a default value, so that entities stored before the field was added can still be deserialized. * - * If your implementation receives an exception here that it does not want to map, it should just - * return it as-is. + * Sometimes we may add a field with a default value, but also want to query on that field in e.g. + * [RepositoryJdbi.getByPredicate]. In this case, it's not enough to add a default value to the + * field that is populated on deserializing from the database - we actually have to migrate the + * stored entity. This method exists for those cases. * - * The entity that was attempted to be created or updated is also provided here, so you can add - * extra context to the mapped exception. + * In the implementation for [RepositoryJdbi.migrate], all entities are read from the database + * table, in a streaming fashion to avoid reading them all into memory. It then updates the + * entities in batches (see [OPTIMAL_BATCH_SIZE]). * - * Example: - * ``` - * override fun mapCreateOrUpdateException(e: Exception, entity: ExampleEntity): Exception { - * val message = e.message - * if (message != null && message.contains("example_unique_field_index")) { - * return UniqueFieldAlreadyExists(entity, cause = e) - * } + * It is important that the migration is done in an idempotent manner, i.e. that it may be + * executed repeatedly with the same results. This is because we call this method from application + * code, and if for example there are multiple instances of the service running, [migrate] will be + * called by each one. * - * return e - * } - * ``` + * Any new fields on the entity with default values will be stored in the database through the + * process of deserializing and re-serializing here. If you want to do further transforms, you can + * use the [transformEntity] parameter. */ - protected open fun mapCreateOrUpdateException(e: Exception, entity: EntityT): Exception { - return e + fun migrate(transformEntity: ((Versioned) -> EntityT)? = null) { + // A default implementation is provided here on the interface, so that implementors don't have + // to implement this themselves (for e.g. mock repositories). + var entities = listAll() + if (transformEntity != null) { + entities = entities.map { entity -> entity.copy(item = transformEntity(entity)) } + } + batchUpdate(entities) } } diff --git a/src/main/kotlin/no/liflig/documentstore/repository/RepositoryJdbi.kt b/src/main/kotlin/no/liflig/documentstore/repository/RepositoryJdbi.kt new file mode 100644 index 0000000..6560d23 --- /dev/null +++ b/src/main/kotlin/no/liflig/documentstore/repository/RepositoryJdbi.kt @@ -0,0 +1,553 @@ +@file:Suppress("MemberVisibilityCanBePrivate") // This is a library, we want to expose fields + +package no.liflig.documentstore.repository + +import java.time.Instant +import no.liflig.documentstore.entity.Entity +import no.liflig.documentstore.entity.EntityId +import no.liflig.documentstore.entity.Version +import no.liflig.documentstore.entity.Versioned +import no.liflig.documentstore.entity.getEntityIdType +import org.jdbi.v3.core.Jdbi +import org.jdbi.v3.core.mapper.RowMapper +import org.jdbi.v3.core.statement.PreparedBatch +import org.jdbi.v3.core.statement.Query + +/** + * An implementation of [Repository] that uses the [JDBI library](https://jdbi.org/) for database + * access. + * + * Provides default implementations for CRUD (Create, Read, Update, Delete), as well as `listByIds` + * and `listAll`. To implement filtering on specific fields on your entities, you can extend this + * and use the [getByPredicate] utility method to provide your own WHERE clause (or + * [getByPredicateWithTotalCount] for better pagination support). + * + * The implementation assumes that the table has the following columns: + * ```sql + * CREATE TABLE example + * ( + * -- Can have type `text` if using `StringEntityId` + * id uuid NOT NULL PRIMARY KEY, + * created_at timestamptz NOT NULL, + * modified_at timestamptz NOT NULL, + * version bigint NOT NULL, + * data jsonb NOT NULL + * ); + * ``` + * + * Also, the given [Jdbi] instance must have the + * [DocumentStorePlugin][no.liflig.documentstore.DocumentStorePlugin] installed for the queries in + * this class to work. + */ +open class RepositoryJdbi>( + protected val jdbi: Jdbi, + protected val tableName: String, + protected val serializationAdapter: SerializationAdapter, +) : Repository { + protected val rowMapper: RowMapper> = + createRowMapper(serializationAdapter::fromJson) + + private val rowMapperWithTotalCount = + createRowMapperWithTotalCount(serializationAdapter::fromJson) + + private val updateResultMapper = createUpdateResultMapper() + + override fun create(entity: EntityT): Versioned { + try { + useHandle(jdbi) { handle -> + val now = Instant.now() + val version = Version.initial() + handle + .createUpdate( + """ + INSERT INTO "${tableName}" (id, data, version, created_at, modified_at) + VALUES (:id, :data::jsonb, :version, :createdAt, :modifiedAt) + """ + .trimIndent(), + ) + .bind("id", entity.id) + .bind("data", serializationAdapter.toJson(entity)) + .bind("version", version) + .bind("createdAt", now) + .bind("modifiedAt", now) + .execute() + return Versioned(entity, version, createdAt = now, modifiedAt = now) + } + } catch (e: Exception) { + // Call mapDatabaseException first to handle connection-related exceptions, before calling + // mapCreateOrUpdateException (which may be overridden by users for custom error handling). + throw mapCreateOrUpdateException(mapDatabaseException(e), entity) + } + } + + override fun get(id: EntityIdT, forUpdate: Boolean): Versioned? { + useHandle(jdbi) { handle -> + return handle + .createQuery( + """ + SELECT id, data, version, created_at, modified_at + FROM "${tableName}" + WHERE id = :id + ORDER BY created_at + ${if (forUpdate) " FOR UPDATE" else ""} + """ + .trimIndent(), + ) + .bind("id", id) + .map(rowMapper) + .firstOrNull() + } + } + + override fun update( + entity: EntityOrSubClassT, + previousVersion: Version + ): Versioned { + try { + useHandle(jdbi) { handle -> + val nextVersion = previousVersion.next() + val modifiedAt = Instant.now() + val updateResult = + handle + .createQuery( + /** See [UpdateResult] for why we use RETURNING here. */ + """ + UPDATE "${tableName}" + SET + version = :nextVersion, + data = :data::jsonb, + modified_at = :modifiedAt + WHERE + id = :id AND + version = :previousVersion + RETURNING + created_at + """ + .trimIndent(), + ) + .bind("nextVersion", nextVersion) + .bind("data", serializationAdapter.toJson(entity)) + .bind("id", entity.id) + .bind("modifiedAt", modifiedAt) + .bind("previousVersion", previousVersion) + .map(updateResultMapper) + .firstOrNull() + + if (updateResult == null) { + throw ConflictRepositoryException( + "Entity with ID '${entity.id}' was concurrently modified between being retrieved and trying to update it here", + ) + } + + return Versioned( + entity, + nextVersion, + createdAt = updateResult.created_at, + modifiedAt = modifiedAt, + ) + } + } catch (e: Exception) { + // Call mapDatabaseException first to handle connection-related exceptions, before calling + // mapCreateOrUpdateException (which may be overridden by users for custom error handling). + throw mapCreateOrUpdateException(mapDatabaseException(e), entity) + } + } + + override fun delete(id: EntityIdT, previousVersion: Version) { + useHandle(jdbi) { handle -> + val deleted = + handle + .createUpdate( + """ + DELETE FROM "${tableName}" + WHERE id = :id AND version = :previousVersion + """ + .trimIndent(), + ) + .bind("id", id) + .bind("previousVersion", previousVersion) + .execute() + + if (deleted == 0) { + throw ConflictRepositoryException( + "Entity with ID '${id}' was concurrently modified between being retrieved and trying to delete it here", + ) + } + } + } + + override fun listByIds(ids: List): List> { + if (ids.isEmpty()) { + return emptyList() + } + + /** See [getEntityIdType]. */ + val elementType = getEntityIdType(ids.first()) + return getByPredicate("id = ANY (:ids)") { bindArray("ids", elementType, ids) } + } + + override fun listAll(): List> { + return getByPredicate() // Defaults to all + } + + override fun batchCreate(entities: List): List> { + val createdEntities = ArrayList>(entities.size) // Initialize with capacity + val now = Instant.now() + val version = Version.initial() + + executeBatchOperation( + entities, + statement = + """ + INSERT INTO "${tableName}" (id, data, version, created_at, modified_at) + VALUES (:id, :data::jsonb, :version, :createdAt, :modifiedAt) + """ + .trimIndent(), + bindParameters = { batch, entity -> + createdEntities.add(Versioned(entity, version, createdAt = now, modifiedAt = now)) + + batch + .bind("id", entity.id) + .bind("data", serializationAdapter.toJson(entity)) + .bind("version", version) + .bind("createdAt", now) + .bind("modifiedAt", now) + }, + ) + + return createdEntities + } + + override fun batchUpdate(entities: List>): List> { + val updatedEntities = ArrayList>(entities.size) // Initialize with capacity + val now = Instant.now() + + executeBatchOperation( + entities, + statement = + """ + UPDATE "${tableName}" + SET + version = :nextVersion, + data = :data::jsonb, + modified_at = :modifiedAt + WHERE + id = :id AND + version = :previousVersion + """ + .trimIndent(), + bindParameters = { batch, entity -> + val nextVersion = entity.version.next() + updatedEntities.add(entity.copy(version = nextVersion, modifiedAt = now)) + + batch + .bind("nextVersion", nextVersion) + .bind("data", serializationAdapter.toJson(entity.item)) + .bind("modifiedAt", now) + .bind("id", entity.item.id) + .bind("previousVersion", entity.version) + }, + handleModifiedRowCounts = { modifiedRowCounts, startIndexOfCurrentBatch -> + for (count in modifiedRowCounts.withIndex()) { + if (count.value == 0) { + val entity = entities[startIndexOfCurrentBatch + count.index] + throw ConflictRepositoryException( + "Entity with ID '${entity.item.id}' was concurrently modified between being retrieved and trying to update it in batch update", + ) + } + } + }, + ) + + return updatedEntities + } + + override fun batchDelete(entities: List>) { + executeBatchOperation( + entities, + statement = + """ + DELETE FROM "${tableName}" + WHERE id = :id AND version = :previousVersion + """ + .trimIndent(), + bindParameters = { batch, entity -> + batch.bind("id", entity.item.id).bind("previousVersion", entity.version) + }, + handleModifiedRowCounts = { modifiedRowCounts, startIndexOfCurrentBatch -> + for (count in modifiedRowCounts.withIndex()) { + if (count.value == 0) { + val entity = entities[startIndexOfCurrentBatch + count.index] + throw ConflictRepositoryException( + "Entity with ID '${entity.item.id}' was concurrently modified between being retrieved and trying to delete it in batch delete", + ) + } + } + }, + ) + } + + /** + * Runs a SELECT query using the given WHERE clause, limit, offset etc. + * + * When using parameters in [sqlWhere], you must remember to bind them through the [bind] function + * argument (do not concatenate user input directly in [sqlWhere], as that exposes you to SQL + * injections). When using a list parameter, use the [Query.bindArray] method and pass the class + * of the list's elements as the second argument (required for JDBI's reflection to work - see + * example below). + * + * [sqlWhere] will typically use Postgres JSON operators to filter on entity fields, since the + * document store uses `jsonb` to store entities (see + * [Postgres docs](https://www.postgresql.org/docs/16/functions-json.html)). + * + * Example implementing a query where we want to look up users from a list of names: + * ``` + * fun getByNames(names: List): List> { + * return getByPredicate("data->>'name' = ANY(:names)") { + * bindArray("names", String::class.java, names) + * } + * } + * ``` + */ + protected open fun getByPredicate( + sqlWhere: String = "TRUE", + limit: Int? = null, + offset: Int? = null, + orderBy: String? = null, + orderDesc: Boolean = false, + forUpdate: Boolean = false, + bind: Query.() -> Query = { this } + ): List> { + useHandle(jdbi) { handle -> + val orderDirection = if (orderDesc) "DESC" else "ASC" + val orderByString = orderBy ?: "created_at" + + val limitString = if (limit != null) "LIMIT ${limit}" else "" + val offsetString = if (offset != null) "OFFSET ${offset}" else "" + val forUpdateString = if (forUpdate) " FOR UPDATE" else "" + + return handle + .createQuery( + """ + SELECT id, data, version, created_at, modified_at + FROM "${tableName}" + WHERE (${sqlWhere}) + ORDER BY ${orderByString} ${orderDirection} + ${limitString} + ${offsetString} + ${forUpdateString} + """ + .trimIndent(), + ) + .bind() + .map(rowMapper) + .list() + } + } + + /** + * Gets database objects matching the given parameters, and the total count of objects matching + * the WHERE clause without `limit`. + * + * This can be used for pagination: for example, if passing e.g. `limit = 10` to display 10 items + * in a page at a time, the total count can be used to display the number of pages. + * + * See [getByPredicate] for further documentation. + */ + protected open fun getByPredicateWithTotalCount( + sqlWhere: String = "TRUE", + limit: Int? = null, + offset: Int? = null, + orderBy: String? = null, + orderDesc: Boolean = false, + bind: Query.() -> Query = { this } + ): ListWithTotalCount> { + useHandle(jdbi) { handle -> + val limitString = limit?.let { "LIMIT $it" } ?: "" + val offsetString = offset?.let { "OFFSET $it" } ?: "" + val orderDirection = if (orderDesc) "DESC" else "ASC" + val orderByString = orderBy ?: "created_at" + + val rows = + handle + .createQuery( + // SQL query based on https://stackoverflow.com/a/28888696 + // Uses a RIGHT JOIN with the count in order to still get the count when no rows + // are returned. + """ + WITH base_query AS ( + SELECT id, data, version, created_at, modified_at + FROM "${tableName}" + WHERE (${sqlWhere}) + ) + SELECT id, data, version, created_at, modified_at, count + FROM ( + TABLE base_query + ORDER BY ${orderByString} ${orderDirection} + ${limitString} + ${offsetString} + ) sub_query + RIGHT JOIN ( + SELECT count(*) FROM base_query + ) c(count) ON true + """ + .trimIndent(), + ) + .bind() + .map(rowMapperWithTotalCount) + .list() + + val entities = rows.mapNotNull { row -> row.entity } + val count = + rows.firstOrNull()?.count + /** + * Should never happen: the query should always return 1 row with the count, even if the + * results are empty (see [EntityRowWithTotalCount]). + */ + ?: throw IllegalStateException("Failed to get total count of objects in search query") + + return ListWithTotalCount(entities, count) + } + } + + override fun migrate(transformEntity: ((Versioned) -> EntityT)?) { + useTransactionHandle(jdbi) { handle -> + val entities = + handle + .createQuery( + """ + SELECT id, data, version, created_at, modified_at + FROM "${tableName}" + FOR UPDATE + """ + .trimIndent(), + ) + .map(rowMapper) + + val modifiedAt = Instant.now() + + executeBatchOperation( + entities, + // We don't have to check version here, since we use FOR UPDATE above, so we know we have + // the latest version + statement = + """ + UPDATE "${tableName}" + SET + version = :nextVersion, + data = :data::jsonb, + modified_at = :modifiedAt + WHERE + id = :id + """ + .trimIndent(), + bindParameters = { batch, entity -> + val nextVersion = entity.version.next() + val updatedEntity = + if (transformEntity == null) entity.item else transformEntity(entity) + + batch + .bind("nextVersion", nextVersion) + .bind("data", serializationAdapter.toJson(updatedEntity)) + .bind("id", entity.item.id) + .bind("modifiedAt", modifiedAt) + }, + ) + } + } + + /** + * Method that you can override to map exceptions thrown in [create] or [update] to your own + * exception type. This is useful to handle e.g. unique constraint violations: instead of letting + * the database throw an opaque `PSQLException` that may be difficult to handle in layers above, + * you can instead check if the given exception is a unique index violation and map it to a more + * useful exception type here. + * + * If your implementation receives an exception here that it does not want to map, it should just + * return it as-is. + * + * The entity that was attempted to be created or updated is also provided here, so you can add + * extra context to the mapped exception. + * + * Example: + * ``` + * override fun mapCreateOrUpdateException(e: Exception, entity: ExampleEntity): Exception { + * val message = e.message + * if (message != null && message.contains("example_unique_field_index")) { + * return UniqueFieldAlreadyExists(entity, cause = e) + * } + * + * return e + * } + * ``` + */ + protected open fun mapCreateOrUpdateException(e: Exception, entity: EntityT): Exception { + return e + } + + /** + * Uses [Prepared Batches from JDBI](https://jdbi.org/releases/3.45.1/#_prepared_batches) to + * execute the given [statement] on the given [items]. For each item, [bindParameters] is called + * to bind parameters to the statement. The items are divided into multiple batches if the number + * of items exceeds [OPTIMAL_BATCH_SIZE]. + * + * [PreparedBatch.execute] returns an array of modified row counts (1 count for every batch item). + * If you want to handle this, use [handleModifiedRowCounts]. This function is called once for + * every executed batch, which may be more than 1 if the number of items exceeds + * [OPTIMAL_BATCH_SIZE]. A second parameter is provided to [handleModifiedRowCounts] with the + * start index of the current batch, which can then be used to get the corresponding entity for + * diagnostics purposes. + * + * Uses a generic param here for batch items, so we can use this for both whole entities (for + * create/update) and entity IDs (for delete). + */ + private fun executeBatchOperation( + items: Iterable, + statement: String, + bindParameters: (PreparedBatch, BatchItemT) -> PreparedBatch, + handleModifiedRowCounts: ((IntArray, Int) -> Unit)? = null, + ) { + useTransactionHandle(jdbi) { handle -> + var currentBatch: PreparedBatch? = null + var elementCountInCurrentBatch = 0 + var startIndexOfCurrentBatch = 0 + + for ((index, element) in items.withIndex()) { + if (currentBatch == null) { + currentBatch = handle.prepareBatch(statement)!! // Should never return null + startIndexOfCurrentBatch = index + } + + currentBatch = bindParameters(currentBatch, element) + currentBatch.add() + elementCountInCurrentBatch++ + + if (elementCountInCurrentBatch >= OPTIMAL_BATCH_SIZE) { + val modifiedRowCounts = currentBatch.execute() + if (handleModifiedRowCounts != null) { + handleModifiedRowCounts(modifiedRowCounts, startIndexOfCurrentBatch) + } + + currentBatch = null + elementCountInCurrentBatch = 0 + } + } + + // If currentBatch is non-null here, that means we still have remaining entities to update + if (currentBatch != null) { + val executeResult = currentBatch.execute() + if (handleModifiedRowCounts != null) { + handleModifiedRowCounts(executeResult, startIndexOfCurrentBatch) + } + } + } + } +} + +/** + * According to Oracle, the optimal size for batch database operations with JDBC is 50-100: + * https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754 + * + * We stay on the conservative end of 50, since we send JSON which is rather memory inefficient. + */ +internal const val OPTIMAL_BATCH_SIZE = 50 diff --git a/src/main/kotlin/no/liflig/documentstore/repository/Transactions.kt b/src/main/kotlin/no/liflig/documentstore/repository/Transactions.kt index db36aee..1fc0205 100644 --- a/src/main/kotlin/no/liflig/documentstore/repository/Transactions.kt +++ b/src/main/kotlin/no/liflig/documentstore/repository/Transactions.kt @@ -54,3 +54,15 @@ fun transactional(jdbi: Jdbi, block: () -> ReturnT): ReturnT { throw mapDatabaseException(e) } } + +/** + * Short-hand for calling [useHandle] inside [transactional], i.e. starting a transaction and + * getting a database handle inside it, or re-using an existing transaction if one is already + * started on this thread. + * + * Not public, since we only use this internally in [RepositoryJdbi] - if library users find a need + * for this, we may consider making it public. + */ +internal fun useTransactionHandle(jdbi: Jdbi, block: (Handle) -> ReturnT): ReturnT { + return transactional(jdbi) { useHandle(jdbi, block) } +} diff --git a/src/test/kotlin/no/liflig/documentstore/repository/BatchTest.kt b/src/test/kotlin/no/liflig/documentstore/repository/BatchTest.kt new file mode 100644 index 0000000..af0ded7 --- /dev/null +++ b/src/test/kotlin/no/liflig/documentstore/repository/BatchTest.kt @@ -0,0 +1,92 @@ +package no.liflig.documentstore.repository + +import java.text.DecimalFormat +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertNotEquals +import kotlin.test.assertNotNull +import no.liflig.documentstore.entity.Version +import no.liflig.documentstore.entity.Versioned +import no.liflig.documentstore.testutils.exampleRepo +import no.liflig.documentstore.testutils.examples.ExampleEntity +import org.junit.jupiter.api.MethodOrderer +import org.junit.jupiter.api.Order +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.TestMethodOrder + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) // To keep the same entities field between tests +@TestMethodOrder(MethodOrderer.OrderAnnotation::class) +class BatchTest { + // Pad numbers in test text with 0s so that we can sort by text + private val testNumberFormat = DecimalFormat("000") + private val largeBatchSize = OPTIMAL_BATCH_SIZE * 2 + 10 + + private lateinit var entities: List> + + @Order(1) + @Test + fun `test batchCreate`() { + val entitiesToCreate = + (1..largeBatchSize).map { number -> + ExampleEntity(text = "batch-test-${testNumberFormat.format(number)}") + } + entities = exampleRepo.batchCreate(entitiesToCreate) + assertNotEquals(0, entities.size) + + val fetched = exampleRepo.getByTexts(entities.map { it.item.text }) + assertEquals(entities.size, fetched.size) + for ((index, entity) in entities.withIndex()) { + // We order by text in ExampleRepository.getByTexts, so they should be returned in same order + assertEquals(entity, fetched[index]) + } + } + + @Order(2) + @Test + fun `test batchUpdate`() { + val updatedEntities = + entities.withIndex().map { (index, entity) -> + val updatedEntity = + entity.item.copy( + moreText = "batch-update-test-${testNumberFormat.format(index + 1)}", + ) + entity.copy(item = updatedEntity) + } + entities = exampleRepo.batchUpdate(updatedEntities) + + val fetched = exampleRepo.listByIds(entities.map { it.item.id }) + assertEquals(entities.size, fetched.size) + for ((index, entity) in entities.withIndex()) { + assertNotNull(entity.item.moreText) + assertEquals(entity, fetched[index]) + } + } + + @Order(3) + @Test + fun `batchUpdate throws ConflictRepositoryException on wrong versions`() { + val entitiesWithWrongVersion = entities.map { it.copy(version = Version(it.version.value - 1)) } + assertFailsWith { + exampleRepo.batchUpdate(entitiesWithWrongVersion) + } + } + + @Order(4) + @Test + fun `batchDelete throws ConflictRepositoryException on wrong versions`() { + val entitiesWithWrongVersion = entities.map { it.copy(version = Version(it.version.value - 1)) } + assertFailsWith { + exampleRepo.batchDelete(entitiesWithWrongVersion) + } + } + + @Order(5) + @Test + fun `test batchDelete`() { + exampleRepo.batchDelete(entities) + + val fetched = exampleRepo.listByIds(entities.map { it.item.id }) + assertEquals(0, fetched.size) + } +} diff --git a/src/test/kotlin/no/liflig/documentstore/repository/MigrationTest.kt b/src/test/kotlin/no/liflig/documentstore/repository/MigrationTest.kt new file mode 100644 index 0000000..84e35ab --- /dev/null +++ b/src/test/kotlin/no/liflig/documentstore/repository/MigrationTest.kt @@ -0,0 +1,3 @@ +package no.liflig.documentstore.repository + +class MigrationTest {}