Skip to content

Commit

Permalink
Add list return to batchCreate/batchUpdate
Browse files Browse the repository at this point in the history
Now that we support database-generated IDs, it is useful to get this
ID returned from batchCreate. Previously, we did not return results here
out of fear of allocating too much, but I now believe this is a
premature optimization. If we do see cases where this causes memory
issues, we can consider adding specialized versions of the batch methods
that do not return results.
  • Loading branch information
hermannm committed Oct 21, 2024
1 parent b52fc3d commit ec4bd9a
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 124 deletions.
26 changes: 4 additions & 22 deletions src/main/kotlin/no/liflig/documentstore/repository/Repository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,11 @@ interface Repository<EntityIdT : EntityId, EntityT : Entity<EntityIdT>> {
* The implementation in [RepositoryJdbi.batchCreate] uses
* [Prepared Batches from JDBI](https://jdbi.org/releases/3.45.1/#_prepared_batches) to make the
* implementation as efficient as possible.
*
* This method does not return a list of the created entities. This is because it takes an
* [Iterable], which may be a lazy view into a large collection, and we don't want to assume for
* library users that it's OK to allocate a list of that size. If you find a use-case for getting
* the [Version] of created entities, you should either list them out afterwards with
* [listByIds]/[listAll]/[RepositoryJdbi.getByPredicate], or alert the library authors so we may
* consider returning results here.
*/
fun batchCreate(entities: Iterable<EntityT>) {
fun batchCreate(entities: Iterable<EntityT>): List<Versioned<EntityT>> {
// A default implementation is provided here on the interface, so that implementors don't have
// to implement this themselves (for e.g. mock repositories).
for (entity in entities) {
create(entity)
}
return entities.map { create(it) }
}

/**
Expand All @@ -76,22 +67,13 @@ interface Repository<EntityIdT : EntityId, EntityT : Entity<EntityIdT>> {
* [Prepared Batches from JDBI](https://jdbi.org/releases/3.45.1/#_prepared_batches) to make the
* implementation as efficient as possible.
*
* This method does not return a list of the updated entities. This is because it takes an
* [Iterable], which may be a lazy view into a large collection, and we don't want to assume for
* library users that it's OK to allocate a list of that size. If you find a use-case for getting
* the new [Version] of updated entities, you should either list them out afterwards with
* [listByIds]/[listAll]/[RepositoryJdbi.getByPredicate], or alert the library authors so we may
* consider returning results here.
*
* @throws ConflictRepositoryException If the version on any of the given entities does not match
* the current version of the entity in the database.
*/
fun batchUpdate(entities: Iterable<Versioned<EntityT>>) {
fun batchUpdate(entities: Iterable<Versioned<EntityT>>): List<Versioned<EntityT>> {
// A default implementation is provided here on the interface, so that implementors don't have
// to implement this themselves (for e.g. mock repositories).
for (entity in entities) {
update(entity.item, entity.version)
}
return entities.map { update(it.item, it.version) }
}

/**
Expand Down
229 changes: 148 additions & 81 deletions src/main/kotlin/no/liflig/documentstore/repository/RepositoryJdbi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import no.liflig.documentstore.entity.Version
import no.liflig.documentstore.entity.Versioned
import no.liflig.documentstore.entity.getEntityIdType
import no.liflig.documentstore.utils.executeBatchOperation
import org.jdbi.v3.core.Handle
import org.jdbi.v3.core.Jdbi
import org.jdbi.v3.core.mapper.RowMapper
import org.jdbi.v3.core.statement.Query
Expand Down Expand Up @@ -41,8 +40,8 @@ import org.jdbi.v3.core.statement.Query
* @param jdbi Must have the [DocumentStorePlugin] installed for the queries in this class to work.
* @param serializationAdapter See [SerializationAdapter] for an example of how to implement this.
* @param idsGeneratedByDatabase When using [IntegerEntityId], one often wants the entity IDs to be
* generated by the database. This affects how we perform [create], so if your `id` column is
* `PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY`, you must set this flag to true.
* generated by the database. This affects how we perform [create] (and [batchCreate]), so if your
* `id` column is `PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY`, you must set this flag to true.
*
* The `create` method still takes an entity with an `id`, but when this flag is set, the `id`
* given to `create` is ignored (you can use [IntegerEntityId.GENERATED] as a dummy ID value). The
Expand Down Expand Up @@ -70,14 +69,14 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(

override fun create(entity: EntityT): Versioned<EntityT> {
try {
useHandle(jdbi) { handle ->
val createdAt = Instant.now()
val version = Version.initial()
val createdAt = Instant.now()
val version = Version.initial()

if (idsGeneratedByDatabase) {
return createEntityWithGeneratedId(handle, entity, createdAt, version)
}
if (idsGeneratedByDatabase) {
return createEntityWithGeneratedId(entity, createdAt, version)
}

useHandle(jdbi) { handle ->
handle
.createUpdate(
"""
Expand All @@ -92,8 +91,9 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
.execute()
return Versioned(entity, version, createdAt = createdAt, modifiedAt = createdAt)
}

return Versioned(entity, version, createdAt = createdAt, modifiedAt = createdAt)
} catch (e: Exception) {
// Call mapDatabaseException first to handle connection-related exceptions, before calling
// mapCreateOrUpdateException (which may be overridden by users for custom error handling).
Expand Down Expand Up @@ -130,41 +130,42 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
* although the behavior we have here is more akin to `GENERATED ALWAYS`.
*/
private fun createEntityWithGeneratedId(
handle: Handle,
entity: EntityT,
createdAt: Instant,
version: Version
): Versioned<EntityT> {
val createdEntity =
handle
.createQuery(
"""
WITH generated_id AS (
SELECT nextval(pg_get_serial_sequence('${tableName}', 'id')) AS value
)
INSERT INTO "${tableName}" (id, data, version, created_at, modified_at)
SELECT
generated_id.value,
jsonb_set(:data::jsonb, '{id}', to_jsonb(generated_id.value)),
:version,
:createdAt,
:modifiedAt
FROM generated_id
RETURNING data
"""
.trimIndent(),
)
.bind("data", serializationAdapter.toJson(entity))
.bind("version", version)
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
.map(entityDataMapper)
.firstOrNull()
?: throw IllegalStateException(
"INSERT query for entity with generated ID did not return entity data",
)
val entityWithGeneratedId =
useHandle(jdbi) { handle ->
handle
.createQuery(
"""
WITH generated_id AS (
SELECT nextval(pg_get_serial_sequence('${tableName}', 'id')) AS value
)
INSERT INTO "${tableName}" (id, data, version, created_at, modified_at)
SELECT
generated_id.value,
jsonb_set(:data::jsonb, '{id}', to_jsonb(generated_id.value)),
:version,
:createdAt,
:modifiedAt
FROM generated_id
RETURNING data
"""
.trimIndent(),
)
.bind("data", serializationAdapter.toJson(entity))
.bind("version", version)
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
.map(entityDataMapper)
.firstOrNull()
?: throw IllegalStateException(
"INSERT query for entity with generated ID did not return entity data",
)
}

return Versioned(createdEntity, version, createdAt = createdAt, modifiedAt = createdAt)
return Versioned(entityWithGeneratedId, version, createdAt = createdAt, modifiedAt = createdAt)
}

override fun get(id: EntityIdT, forUpdate: Boolean): Versioned<EntityT>? {
Expand Down Expand Up @@ -279,17 +280,21 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
return getByPredicate() // Defaults to all
}

override fun batchCreate(entities: Iterable<EntityT>) {
transactional {
useHandle(jdbi) { handle ->
val createdAt = Instant.now()
val version = Version.initial()
override fun batchCreate(entities: Iterable<EntityT>): List<Versioned<EntityT>> {
val size = entities.sizeIfKnown()
if (size == 0) {
return emptyList()
}

if (idsGeneratedByDatabase) {
batchCreateEntitiesWithGeneratedId(handle, entities, createdAt, version)
return@transactional
}
val createdAt = Instant.now()
val version = Version.initial()

if (idsGeneratedByDatabase) {
return batchCreateEntitiesWithGeneratedId(entities, size, createdAt, version)
}

transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
handle,
entities,
Expand All @@ -310,47 +315,87 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
)
}
}

// We wait until here to create the result list, which may be large, to avoid allocating it
// before calling the database. That would keep the list in memory while we are waiting for the
// database, needlessly reducing throughput.
return entities.map { entity ->
Versioned(entity, version, createdAt = createdAt, modifiedAt = createdAt)
}
}

/**
* See:
* - [createEntityWithGeneratedId] for the challenges with generated IDs, which we also face here
* - [no.liflig.documentstore.utils.executeBatch] for how we handle returning the data from our
* created entities (which we need here in order to get the generated IDs)
*/
private fun batchCreateEntitiesWithGeneratedId(
handle: Handle,
entities: Iterable<EntityT>,
size: Int?,
createdAt: Instant,
version: Version
) {
executeBatchOperation(
handle,
entities,
statement =
"""
WITH generated_id AS (
SELECT nextval(pg_get_serial_sequence('${tableName}', 'id')) AS value
)
INSERT INTO "${tableName}" (id, data, version, created_at, modified_at)
SELECT
generated_id.value,
jsonb_set(:data::jsonb, '{id}', to_jsonb(generated_id.value)),
:version,
:createdAt,
:modifiedAt
FROM generated_id
"""
.trimIndent(),
bindParameters = { batch, entity ->
batch
.bind("data", serializationAdapter.toJson(entity))
.bind("version", version)
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
},
)
}
): List<Versioned<EntityT>> {
// If we know the size of the given entities, we want to pre-allocate capacity for the result
val entitiesWithGeneratedId: ArrayList<Versioned<EntityT>> =
if (size != null) ArrayList(size) else ArrayList()

override fun batchUpdate(entities: Iterable<Versioned<EntityT>>) {
transactional {
useHandle(jdbi) { handle ->
val now = Instant.now()
executeBatchOperation(
handle,
entities,
statement =
"""
WITH generated_id AS (
SELECT nextval(pg_get_serial_sequence('${tableName}', 'id')) AS value
)
INSERT INTO "${tableName}" (id, data, version, created_at, modified_at)
SELECT
generated_id.value,
jsonb_set(:data::jsonb, '{id}', to_jsonb(generated_id.value)),
:version,
:createdAt,
:modifiedAt
FROM generated_id
"""
.trimIndent(),
bindParameters = { batch, entity ->
batch
.bind("data", serializationAdapter.toJson(entity))
.bind("version", version)
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
},
columnsToReturn = arrayOf(Columns.DATA),
handleReturnedColumns = { resultSet ->
for (entityWithGeneratedId in resultSet.map(entityDataMapper)) {
entitiesWithGeneratedId.add(
Versioned(
entityWithGeneratedId,
version,
createdAt = createdAt,
modifiedAt = createdAt,
),
)
}
},
)
}
}

return entitiesWithGeneratedId
}

override fun batchUpdate(entities: Iterable<Versioned<EntityT>>): List<Versioned<EntityT>> {
if (entities.sizeIfKnown() == 0) {
return emptyList()
}

val modifiedAt = Instant.now()

transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
handle,
entities,
Expand All @@ -372,7 +417,7 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
batch
.bind("data", serializationAdapter.toJson(entity.item))
.bind("nextVersion", nextVersion)
.bind("modifiedAt", now)
.bind("modifiedAt", modifiedAt)
.bind("id", entity.item.id)
.bind("previousVersion", entity.version)
},
Expand All @@ -382,9 +427,20 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
)
}
}

// We wait until here to create the result list, which may be large, to avoid allocating it
// before calling the database. That would keep the list in memory while we are waiting for the
// database, needlessly reducing throughput.
return entities.map { entity ->
entity.copy(modifiedAt = modifiedAt, version = entity.version.next())
}
}

override fun batchDelete(entities: Iterable<Versioned<EntityT>>) {
if (entities.sizeIfKnown() == 0) {
return
}

transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
Expand Down Expand Up @@ -596,3 +652,14 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
}
}
}

/**
* An Iterable may or may not have a known size. But in some of our methods on RepositoryJdbi, we
* can make optimizations, such as returning early or pre-allocating results, if we know the size.
* So we use this extension function to see if we can get a size from the Iterable.
*
* This is the same way that the Kotlin standard library does it for e.g. [Iterable.map].
*/
private fun Iterable<*>.sizeIfKnown(): Int? {
return if (this is Collection<*>) this.size else null
}
Loading

0 comments on commit ec4bd9a

Please sign in to comment.