diff --git a/addons/common/persistence/postgresql/src/main/java/org/kie/kogito/persistence/postgresql/PostgresqlProcessInstances.java b/addons/common/persistence/postgresql/src/main/java/org/kie/kogito/persistence/postgresql/PostgresqlProcessInstances.java index 4e08915476d..90e9dacb0e3 100644 --- a/addons/common/persistence/postgresql/src/main/java/org/kie/kogito/persistence/postgresql/PostgresqlProcessInstances.java +++ b/addons/common/persistence/postgresql/src/main/java/org/kie/kogito/persistence/postgresql/PostgresqlProcessInstances.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.persistence.postgresql; +import java.util.Arrays; import java.util.Iterator; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -26,6 +27,8 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import javax.naming.OperationNotSupportedException; + import org.jbpm.flow.serialization.ProcessInstanceMarshallerService; import org.kie.kogito.process.MutableProcessInstances; import org.kie.kogito.process.Process; @@ -54,6 +57,8 @@ public class PostgresqlProcessInstances implements MutableProcessInstances { private static final String FIND_BY_ID = "SELECT payload, version FROM process_instances WHERE process_id = $1 and id = $2 and process_version "; private static final String FIND_ALL = "SELECT payload, version FROM process_instances WHERE process_id = $1 and process_version "; private static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET payload = $1, version = $2 WHERE process_id = $3 and id = $4 and version = $5 and process_version "; + private static final String MIGRATE_BULK = "UPDATE process_instances SET process_id = $1 and process_version = $2 WHERE process_id = $3 and process_version "; + private static final String MIGRATE_INSTANCE = "UPDATE process_instances SET process_id = $1 and process_version = $2 WHERE process_id = $3 and id = $4 and process_version "; private final Process process; private final PgPool client; @@ -161,6 +166,42 @@ private RuntimeException uncheckedException(Exception ex, String message, Object return new RuntimeException(String.format(message, param), ex); } + @Override + public void migrate(String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException { + try { + Future> future = null; + if (process.version() == null) { + future = client.preparedQuery(MIGRATE_BULK + IS_NULL).execute(tuple(targetProcessId, targetProcessVersion, process.id())); + } else { + future = client.preparedQuery(MIGRATE_BULK + "= $4").execute(tuple(targetProcessId, targetProcessVersion, process.id(), process.version())); + } + getExecutedResult(future); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw uncheckedException(e, "Error migration process instance %s %s", process.id(), process.version()); + } catch (Exception e) { + throw uncheckedException(e, "Error deleting process instance %s %s", process.id(), process.version()); + } + } + + @Override + public void migrate(String targetProcessId, String targetProcessVersion, String[] processIds) throws OperationNotSupportedException { + try { + Future> future = null; + if (process.version() == null) { + future = client.preparedQuery(MIGRATE_BULK + IS_NULL).execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id())); + } else { + future = client.preparedQuery(MIGRATE_BULK + "= $5").execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id(), process.version())); + } + getExecutedResult(future); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw uncheckedException(e, "Error deleting process instance %s", Arrays.toString(processIds)); + } catch (Exception e) { + throw uncheckedException(e, "Error deleting process instance %s", Arrays.toString(processIds)); + } + } + private boolean updateInternal(String id, byte[] payload) { try { Future> future = diff --git a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java index 6cc0d95649c..522f04d03f0 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java @@ -21,6 +21,8 @@ import java.util.Optional; import java.util.stream.Stream; +import javax.naming.OperationNotSupportedException; + public interface ProcessInstances { default Optional> findById(String id) { @@ -29,8 +31,16 @@ default Optional> findById(String id) { Optional> findById(String id, ProcessInstanceReadMode mode); + default Optional> findByBusinessKey(String id) { return stream().filter(pi -> id.equals(pi.businessKey())).findAny(); + + default void migrate(String targetProcessId, String targetProcessVersion, String[] processIds) throws OperationNotSupportedException { + throw new OperationNotSupportedException(); + } + + default void migrate(String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException { + throw new OperationNotSupportedException(); } Stream> stream(ProcessInstanceReadMode mode); @@ -38,4 +48,6 @@ default Optional> findByBusinessKey(String id) { default Stream> stream() { return stream(ProcessInstanceReadMode.READ_ONLY); } + + } diff --git a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessService.java b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessService.java index d46e32a308d..f651d2a8616 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessService.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessService.java @@ -25,6 +25,8 @@ import java.util.Optional; import java.util.function.Function; +import javax.naming.OperationNotSupportedException; + import org.kie.kogito.MapOutput; import org.kie.kogito.MappableToModel; import org.kie.kogito.Model; @@ -65,6 +67,10 @@ ProcessInstance createProcessInstance(Process process, S , R> Optional findById(Process process, String id); + void migrate(Process process, String targetProcessId, String targetProcessVersion, String ...id) throws OperationNotSupportedException; + + void migrate(Process process, String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException; + , R> Optional delete(Process process, String id); , R> Optional update(Process process, String id, T resource); diff --git a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/ProcessServiceImpl.java b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/ProcessServiceImpl.java index ec004f755fe..c0d9f902779 100644 --- a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/ProcessServiceImpl.java +++ b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/ProcessServiceImpl.java @@ -27,6 +27,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.naming.OperationNotSupportedException; + import org.jbpm.process.instance.impl.humantask.HumanTaskHelper; import org.jbpm.process.instance.impl.humantask.HumanTaskTransition; import org.jbpm.util.JsonSchemaUtil; @@ -114,6 +116,16 @@ public , R> Optional findById(Process process return mappable.map(MappableToModel::toModel); } + @Override + public void migrate(Process process, String targetProcessId, String targetProcessVersion, String... processIds) throws OperationNotSupportedException { + process.instances().migrate(targetProcessId, targetProcessVersion, processIds); + } + + @Override + public void migrate(Process process, String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException { + process.instances().migrate(targetProcessId, targetProcessVersion); + } + @Override public , R> Optional delete(Process process, String id) { return UnitOfWorkExecutor.executeInUnitOfWork( @@ -400,4 +412,5 @@ public Map getSchemaAndPhases(Process proce new Policy[] { policy }, JsonSchemaUtil.load(Thread.currentThread().getContextClassLoader(), process.id(), taskName)); } + }