Skip to content

Commit

Permalink
Multy-repository transactions support
Browse files Browse the repository at this point in the history
  • Loading branch information
gnuzzz committed Mar 28, 2021
1 parent 5572e5e commit ea13959
Show file tree
Hide file tree
Showing 14 changed files with 445 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.taymyr.play.repository.domain

import akka.Done
import java.util.concurrent.CompletionStage

/**
* DDD repository transaction
*/
interface Transaction {

/**
* Commits transaction.
*/
fun commit(): CompletionStage<Done>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.taymyr.play.repository.domain

import akka.Done
import java.util.concurrent.CompletionStage

/**
* DDD repository for identified aggregate with multy-repository transactions.
*/
interface TransactionalRepository<Aggregate, Identity> : Repository<Aggregate, Identity> {

/**
* Removing aggregate from the repository within multy-repository transaction.
* @param aggregate Aggregate.
* @param transaction Transaction
* @return [Done] if removing successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun remove(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>

/**
* Removing aggregates from the repository within multy-repository transaction.
* @param aggregates List of aggregates.
* @param transaction Transaction
* @return [Done] if removing successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun removeAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>

/**
* Create aggregate on the repository within multy-repository transaction.
* @param aggregate Aggregate.
* @param transaction Transaction
* @return [Done] if creation successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun create(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>

/**
* Create aggregates on the repository within multy-repository transaction.
* @param aggregates Aggregates.
* @param transaction Transaction
* @return [Done] if creation successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun createAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>

/**
* Saving aggregate on the repository within multy-repository transaction.
* @param aggregate Aggregate.
* @param transaction Transaction
* @return [Done] if saving successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun save(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done>

/**
* Saving aggregates on the repository within multy-repository transaction.
* @param aggregates Aggregates.
* @param transaction Transaction
* @return [Done] if saving successfully. Otherwise will throw an exception.
* @throws Exception Any exceptions while execute a query on the database will wrapped.
*/
fun saveAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done>

/**
* Create new multy-repository transaction.
*/
fun createTransaction(): Transaction
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package org.taymyr.play.repository.infrastructure.persistence

import akka.Done
import org.hibernate.Session
import org.taymyr.play.repository.domain.Repository
import org.taymyr.play.repository.domain.Transaction
import org.taymyr.play.repository.domain.TransactionalRepository
import play.db.jpa.JPAApi
import java.io.Serializable
import java.lang.IllegalArgumentException
import java.util.Optional
import java.util.Optional.ofNullable
import java.util.concurrent.CompletableFuture.supplyAsync
Expand All @@ -20,7 +22,7 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
protected val executionContext: DatabaseExecutionContext,
protected val clazz: Class<out Aggregate>,
protected val persistenceUnitName: String = "default"
) : Repository<Aggregate, Identity> {
) : TransactionalRepository<Aggregate, Identity> {

protected fun <E> transaction(function: (EntityManager) -> E): E = jpaApi.withTransaction(persistenceUnitName, function)

Expand Down Expand Up @@ -59,6 +61,11 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
Done.getInstance()
}

override fun remove(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.remove(this, aggregate)
}

override fun removeAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
aggregates.forEach {
if (em.contains(it)) em.remove(it)
Expand All @@ -67,23 +74,50 @@ abstract class JPARepository<Aggregate : Any, Identity : Serializable> @JvmOverl
Done.getInstance()
}

override fun removeAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.remove(this, aggregates)
}

override fun create(aggregate: Aggregate): CompletionStage<Done> = execute { em ->
em.persist(aggregate)
Done.getInstance()
}

override fun create(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.create(this, aggregate)
}

override fun createAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
aggregates.forEach { em.persist(it) }
Done.getInstance()
}

override fun createAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.create(this, aggregates)
}

override fun save(aggregate: Aggregate): CompletionStage<Done> = execute { em ->
em.merge(aggregate)
Done.getInstance()
}

override fun save(aggregate: Aggregate, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.save(this, aggregate)
}

override fun saveAll(aggregates: Collection<Aggregate>): CompletionStage<Done> = execute { em ->
aggregates.forEach { em.merge(it) }
Done.getInstance()
}

override fun saveAll(aggregates: Collection<Aggregate>, transaction: Transaction): CompletionStage<Done> {
if (!(transaction is JPATransaction)) throw IllegalArgumentException("transaction must be JPATransaction")
return transaction.save(this, aggregates)
}

override fun createTransaction(): Transaction = JPATransaction(jpaApi, executionContext, persistenceUnitName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.taymyr.play.repository.infrastructure.persistence

import akka.Done
import org.taymyr.play.repository.domain.Transaction
import play.db.jpa.JPAApi
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import javax.persistence.EntityManager

class JPATransaction(
protected val jpaApi: JPAApi,
protected val executionContext: DatabaseExecutionContext,
protected val persistenceUnitName: String = "default"
) : Transaction {

private val oparetionsLog: MutableList<Operation<*>> = mutableListOf()

/**
* Saves to transaction log remove operation for repository and aggregate.
*
* @param repository JPA repository
* @param aggregate removed aggregate
*/
fun <AGGREGATE : Any> remove(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
oparetionsLog.add(Remove(repository, listOf(aggregate)))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log remove operation for repository and aggregates.
*
* @param repository JPA repository
* @param aggregate removed aggregates
*/
fun <AGGREGATE : Any> remove(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
oparetionsLog.add(Remove(repository, aggregates))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log create operation for repository and aggregate.
*
* @param repository JPA repository
* @param aggregate created aggregate
*/
fun <AGGREGATE : Any> create(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
oparetionsLog.add(Create(repository, listOf(aggregate)))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log create operation for repository and aggregates.
*
* @param repository JPA repository
* @param aggregate created aggregates
*/
fun <AGGREGATE : Any> create(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
oparetionsLog.add(Create(repository, aggregates))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log save operation for repository and aggregate.
*
* @param repository JPA repository
* @param aggregate saved aggregate
*/
fun <AGGREGATE : Any> save(repository: JPARepository<AGGREGATE, *>, aggregate: AGGREGATE): CompletionStage<Done> {
oparetionsLog.add(Save(repository, listOf(aggregate)))
return CompletableFuture.completedFuture(Done.getInstance())
}

/**
* Saves to transaction log save operation for repository and aggregates.
*
* @param repository JPA repository
* @param aggregate saved aggregates
*/
fun <AGGREGATE : Any> save(repository: JPARepository<AGGREGATE, *>, aggregates: Collection<AGGREGATE>): CompletionStage<Done> {
oparetionsLog.add(Save(repository, aggregates))
return CompletableFuture.completedFuture(Done.getInstance())
}

override fun commit(): CompletionStage<Done> = execute { em ->
oparetionsLog.forEach { it.process(em) }
Done.getInstance()
}

protected fun <E> transaction(function: (EntityManager) -> E): E = jpaApi.withTransaction(persistenceUnitName, function)

protected fun <E> execute(function: (EntityManager) -> E): CompletionStage<E> =
CompletableFuture.supplyAsync(Supplier { transaction(function) }, executionContext)

private abstract class Operation<AGGREGATE : Any>(open val repository: JPARepository<AGGREGATE, *>, open val aggregates: Collection<AGGREGATE>) {
abstract fun process(em: EntityManager)
}

private data class Remove<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
override fun process(em: EntityManager) {
aggregates.forEach {
if (em.contains(it)) em.remove(it)
else em.remove(em.merge(it))
}
}
}

private data class Create<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
override fun process(em: EntityManager) {
aggregates.forEach { em.persist(it) }
}
}

private data class Save<AGGREGATE : Any>(override val repository: JPARepository<AGGREGATE, *>, override val aggregates: Collection<AGGREGATE>) : Operation<AGGREGATE>(repository, aggregates) {
override fun process(em: EntityManager) {
aggregates.forEach { em.merge(it) }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.taymyr.play.repository.domain

interface Order {
val id: String
val product: Product
val volume: Int?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.taymyr.play.repository.domain

interface OrderRepository : TransactionalRepository<Order, String>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.taymyr.play.repository.domain

interface Product {
val id: String
val name: String;
val volume: Int
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.taymyr.play.repository.domain

interface ProductRepository : TransactionalRepository<Product, String>
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.taymyr.play.repository.infrastructure.persistence

import org.taymyr.play.repository.domain.Order
import javax.persistence.Entity
import javax.persistence.FetchType
import javax.persistence.Id
import javax.persistence.JoinColumn
import javax.persistence.ManyToOne
import javax.persistence.Table

@Entity
@Table(name = "ORDERS")
data class OrderImpl(
@Id override val id: String,

@ManyToOne(fetch = FetchType.EAGER)
@JoinColumn(name = "product_id")
override val product: ProductImpl,

override val volume: Int?
) : Order
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.taymyr.play.repository.infrastructure.persistence

import org.taymyr.play.repository.domain.Order
import org.taymyr.play.repository.domain.OrderRepository
import play.db.jpa.JPAApi
import java.util.UUID
import javax.inject.Inject

class OrderRepositoryImpl@Inject constructor(
jpaApi: JPAApi,
executionContext: DatabaseExecutionContext
) : JPARepository<Order, String>(jpaApi, executionContext, OrderImpl::class.java), OrderRepository {

override fun nextIdentity(): String = UUID.randomUUID().toString()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.taymyr.play.repository.infrastructure.persistence

import org.taymyr.play.repository.domain.Product
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table

@Entity
@Table(name = "PRODUCTS")
data class ProductImpl(
@Id override val id: String,
override val name: String,
override val volume: Int
) : Product
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.taymyr.play.repository.infrastructure.persistence

import org.taymyr.play.repository.domain.Product
import org.taymyr.play.repository.domain.ProductRepository
import play.db.jpa.JPAApi
import java.util.UUID
import javax.inject.Inject

class ProductRepositoryImpl @Inject constructor(
jpaApi: JPAApi,
executionContext: DatabaseExecutionContext
) : JPARepository<Product, String>(jpaApi, executionContext, ProductImpl::class.java), ProductRepository {

override fun nextIdentity(): String = UUID.randomUUID().toString()
}
Loading

0 comments on commit ea13959

Please sign in to comment.