Skip to content

Commit

Permalink
Multy-repository transactions support
Browse files Browse the repository at this point in the history
  • Loading branch information
gnuzzz committed Mar 27, 2021
1 parent 5572e5e commit 564ccf8
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ interface Repository<Aggregate, Identity> {
*/
fun remove(aggregate: Aggregate): CompletionStage<Done>

/**
* Removing aggregate from the repository within multy-repository transaction.
* @param aggregate Aggregate.
* @param transaction Transaction
* @return [Done] if removing successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun remove(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>

/**
* Removing aggregates from the repository.
* @param aggregates List of aggregates.
Expand All @@ -62,6 +71,15 @@ interface Repository<Aggregate, Identity> {
*/
fun removeAll(aggregates: Collection<Aggregate>): CompletionStage<Done>

/**
* Removing aggregates from the repository within multy-repository transaction.
* @param aggregates List of aggregates.
* @param transaction Transaction
* @return [Done] if removing successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun removeAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>

/**
* Create aggregate on the repository.
* @param aggregate Aggregate.
Expand All @@ -70,6 +88,15 @@ interface Repository<Aggregate, Identity> {
*/
fun create(aggregate: Aggregate): CompletionStage<Done>

/**
* Create aggregate on the repository within multy-repository transaction.
* @param aggregate Aggregate.
* @param transaction Transaction
* @return [Done] if creation successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun create(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>

/**
* Create aggregates on the repository.
* @param aggregates Aggregates.
Expand All @@ -78,6 +105,15 @@ interface Repository<Aggregate, Identity> {
*/
fun createAll(aggregates: Collection<Aggregate>): CompletionStage<Done>

/**
* Create aggregates on the repository within multy-repository transaction.
* @param aggregates Aggregates.
* @param transaction Transaction
* @return [Done] if creation successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun createAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>

/**
* Saving aggregate on the repository.
* @param aggregate Aggregate.
Expand All @@ -86,11 +122,34 @@ interface Repository<Aggregate, Identity> {
*/
fun save(aggregate: Aggregate): CompletionStage<Done>

/**
* Saving aggregate on the repository within multy-repository transaction.
* @param aggregate Aggregate.
* @param transaction Transaction
* @return [Done] if saving successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun save(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>

/**
* Saving aggregates on the repository.
* @param aggregates Aggregates.
* @return [Done] if saving successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun saveAll(aggregates: Collection<Aggregate>): CompletionStage<Done>

/**
* Saving aggregates on the repository within multy-repository transaction.
* @param aggregates Aggregates.
* @param transaction Transaction
* @return [Done] if saving successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun saveAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>

/**
* Create new multy-repository transaction.
*/
fun createTransaction(): Transaction
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.taymyr.play.repository.domain

import akka.Done
import java.util.concurrent.CompletionStage

/**
* DDD repository transaction
*/
interface Transaction {

/**
* Commits transaction.
*/
fun commit(): CompletionStage<Done>
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package org.taymyr.play.repository.infrastructure.persistence
import akka.Done
import org.hibernate.Session
import org.taymyr.play.repository.domain.Repository
import org.taymyr.play.repository.domain.Transaction
import play.db.jpa.JPAApi
import java.io.Serializable
import java.lang.IllegalArgumentException
import java.util.Optional
import java.util.Optional.ofNullable
import java.util.concurrent.CompletableFuture.supplyAsync
Expand Down Expand Up @@ -59,6 +61,11 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
Done.getInstance()
}

override fun remove(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.remove(this, aggregate)
}

override fun removeAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
aggregates.forEach {
if (em.contains(it)) em.remove(it)
Expand All @@ -67,23 +74,50 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
Done.getInstance()
}

override fun removeAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.remove(this, aggregates)
}

override fun create(aggregate: Aggregate): CompletionStage<Done> = execute { em ->
em.persist(aggregate)
Done.getInstance()
}

override fun create(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.create(this, aggregate)
}

override fun createAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
aggregates.forEach { em.persist(it) }
Done.getInstance()
}

override fun createAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.create(this, aggregates)
}

override fun save(aggregate: Aggregate): CompletionStage<Done> = execute { em ->
em.merge(aggregate)
Done.getInstance()
}

override fun save(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.save(this, aggregate)
}

override fun saveAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
aggregates.forEach { em.merge(it) }
Done.getInstance()
}

override fun saveAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.save(this, aggregates)
}

override fun createTransaction(): Transaction = JPATransaction(jpaApi, executionContext, persistenceUnitName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.taymyr.play.repository.infrastructure.persistence

import akka.Done
import org.taymyr.play.repository.domain.Transaction
import play.db.jpa.JPAApi
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import javax.persistence.EntityManager

class JPATransaction(
protected val jpaApi: JPAApi,
protected val executionContext: DatabaseExecutionContext,
protected val persistenceUnitName: String = "default"
) : Transaction {

private val oparetionsLog: MutableList<Operation<*>> = mutableListOf()

/**
* Saves to transaction log remove operation for repository and aggregate.
*
* @param repository JPA repository
* @param aggregate removed aggregate
*/
fun <AGGREGATE : Any> remove(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
oparetionsLog.add(Remove(repository, listOf(aggregate)))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log remove operation for repository and aggregates.
*
* @param repository JPA repository
* @param aggregate removed aggregates
*/
fun <AGGREGATE : Any> remove(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
oparetionsLog.add(Remove(repository, aggregates))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log create operation for repository and aggregate.
*
* @param repository JPA repository
* @param aggregate created aggregate
*/
fun <AGGREGATE : Any> create(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
oparetionsLog.add(Create(repository, listOf(aggregate)))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log create operation for repository and aggregates.
*
* @param repository JPA repository
* @param aggregate created aggregates
*/
fun <AGGREGATE : Any> create(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
oparetionsLog.add(Create(repository, aggregates))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log save operation for repository and aggregate.
*
* @param repository JPA repository
* @param aggregate saved aggregate
*/
fun <AGGREGATE : Any> save(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
oparetionsLog.add(Save(repository, listOf(aggregate)))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log save operation for repository and aggregates.
*
* @param repository JPA repository
* @param aggregate saved aggregates
*/
fun <AGGREGATE : Any> save(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
oparetionsLog.add(Save(repository, aggregates))
return CompletableFuture.completedFuture(Done.getInstance())
}

override fun commit(): CompletionStage<Done> = execute { em ->
oparetionsLog.forEach { it.process(em) }
Done.getInstance()
}

protected fun <E> transaction(function: (EntityManager) -> E): E = jpaApi.withTransaction(persistenceUnitName, function)

protected fun <E> execute(function: (EntityManager) -> E): CompletionStage<E> =
CompletableFuture.supplyAsync(Supplier { transaction(function) }, executionContext)

private abstract class Operation<AGGREGATE : Any>(open val repository: JPARepository<AGGREGATE, *>, open val aggregates: Collection<AGGREGATE>) {
abstract fun process(em: EntityManager)
}

private data class Remove<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
override fun process(em: EntityManager) {
aggregates.forEach {
if (em.contains(it)) em.remove(it)
else em.remove(em.merge(it))
}
}
}

private data class Create<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
override fun process(em: EntityManager) {
aggregates.forEach { em.persist(it) }
}
}

private data class Save<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
override fun process(em: EntityManager) {
aggregates.forEach { em.merge(it) }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.taymyr.play.repository.domain

interface Address {
val id: String
val zip: String?
val city: String
val street: String
val user: User
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.taymyr.play.repository.domain

interface AddressRepository : Repository<Address, String>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.taymyr.play.repository.infrastructure.persistence

import org.taymyr.play.repository.domain.Address
import javax.persistence.Entity
import javax.persistence.FetchType
import javax.persistence.Id
import javax.persistence.JoinColumn
import javax.persistence.ManyToOne
import javax.persistence.Table

@Entity
@Table(name = "ADDRESS")
data class AddressImpl(

@Id override val id: String,

override val zip: String?,

override val city: String,

override val street: String,

@ManyToOne(fetch = FetchType.EAGER)
@JoinColumn(name = "user_id")
override val user: UserImpl
) : Address
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.taymyr.play.repository.infrastructure.persistence

import org.taymyr.play.repository.domain.Address
import org.taymyr.play.repository.domain.AddressRepository
import play.db.jpa.JPAApi
import java.util.UUID
import javax.inject.Inject

class AddressRepositoryImpl @Inject constructor(
jpaApi: JPAApi,
executionContext: DatabaseExecutionContext
) : JPARepository<Address, String>(jpaApi, executionContext, AddressImpl::class.java), AddressRepository {

override fun nextIdentity(): String = UUID.randomUUID().toString()
}
Loading

0 comments on commit 564ccf8

Please sign in to comment.