Skip to content

Commit

Permalink
[incubator-kie-issues-916] add marking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Mar 1, 2024
1 parent c6b9aca commit 76f8e41
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.persistence.postgresql;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -26,6 +27,8 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.naming.OperationNotSupportedException;

import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
Expand Down Expand Up @@ -54,6 +57,8 @@ public class PostgresqlProcessInstances implements MutableProcessInstances {
private static final String FIND_BY_ID = "SELECT payload, version FROM process_instances WHERE process_id = $1 and id = $2 and process_version ";
private static final String FIND_ALL = "SELECT payload, version FROM process_instances WHERE process_id = $1 and process_version ";
private static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET payload = $1, version = $2 WHERE process_id = $3 and id = $4 and version = $5 and process_version ";
private static final String MIGRATE_BULK = "UPDATE process_instances SET process_id = $1 and process_version = $2 WHERE process_id = $3 and process_version ";
private static final String MIGRATE_INSTANCE = "UPDATE process_instances SET process_id = $1 and process_version = $2 WHERE process_id = $3 and id = $4 and process_version ";

private final Process<?> process;
private final PgPool client;
Expand Down Expand Up @@ -161,6 +166,42 @@ private RuntimeException uncheckedException(Exception ex, String message, Object
return new RuntimeException(String.format(message, param), ex);
}

@Override
public void migrate(String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException {
try {
Future<RowSet<Row>> future = null;
if (process.version() == null) {
future = client.preparedQuery(MIGRATE_BULK + IS_NULL).execute(tuple(targetProcessId, targetProcessVersion, process.id()));
} else {
future = client.preparedQuery(MIGRATE_BULK + "= $4").execute(tuple(targetProcessId, targetProcessVersion, process.id(), process.version()));
}
getExecutedResult(future);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw uncheckedException(e, "Error migration process instance %s %s", process.id(), process.version());
} catch (Exception e) {
throw uncheckedException(e, "Error deleting process instance %s %s", process.id(), process.version());
}
}

@Override
public void migrate(String targetProcessId, String targetProcessVersion, String[] processIds) throws OperationNotSupportedException {
try {
Future<RowSet<Row>> future = null;
if (process.version() == null) {
future = client.preparedQuery(MIGRATE_BULK + IS_NULL).execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id()));
} else {
future = client.preparedQuery(MIGRATE_BULK + "= $5").execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id(), process.version()));
}
getExecutedResult(future);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw uncheckedException(e, "Error deleting process instance %s", Arrays.toString(processIds));
} catch (Exception e) {
throw uncheckedException(e, "Error deleting process instance %s", Arrays.toString(processIds));
}
}

private boolean updateInternal(String id, byte[] payload) {
try {
Future<RowSet<Row>> future =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Optional;
import java.util.stream.Stream;

import javax.naming.OperationNotSupportedException;

public interface ProcessInstances<T> {

default Optional<ProcessInstance<T>> findById(String id) {
Expand All @@ -29,13 +31,23 @@ default Optional<ProcessInstance<T>> findById(String id) {

Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);


default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
return stream().filter(pi -> id.equals(pi.businessKey())).findAny();

default void migrate(String targetProcessId, String targetProcessVersion, String[] processIds) throws OperationNotSupportedException {
throw new OperationNotSupportedException();
}

default void migrate(String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException {
throw new OperationNotSupportedException();
}

Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);

default Stream<ProcessInstance<T>> stream() {
return stream(ProcessInstanceReadMode.READ_ONLY);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Optional;
import java.util.function.Function;

import javax.naming.OperationNotSupportedException;

import org.kie.kogito.MapOutput;
import org.kie.kogito.MappableToModel;
import org.kie.kogito.Model;
Expand Down Expand Up @@ -65,6 +67,10 @@ <T extends Model> ProcessInstance<T> createProcessInstance(Process<T> process, S

<T extends MappableToModel<R>, R> Optional<R> findById(Process<T> process, String id);

<T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion, String ...id) throws OperationNotSupportedException;

<T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException;

<T extends MappableToModel<R>, R> Optional<R> delete(Process<T> process, String id);

<T extends MappableToModel<R>, R> Optional<R> update(Process<T> process, String id, T resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.naming.OperationNotSupportedException;

import org.jbpm.process.instance.impl.humantask.HumanTaskHelper;
import org.jbpm.process.instance.impl.humantask.HumanTaskTransition;
import org.jbpm.util.JsonSchemaUtil;
Expand Down Expand Up @@ -114,6 +116,16 @@ public <T extends MappableToModel<R>, R> Optional<R> findById(Process<T> process
return mappable.map(MappableToModel::toModel);
}

@Override
public <T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion, String... processIds) throws OperationNotSupportedException {
process.instances().migrate(targetProcessId, targetProcessVersion, processIds);
}

@Override
public <T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException {
process.instances().migrate(targetProcessId, targetProcessVersion);
}

@Override
public <T extends MappableToModel<R>, R> Optional<R> delete(Process<T> process, String id) {
return UnitOfWorkExecutor.executeInUnitOfWork(
Expand Down Expand Up @@ -400,4 +412,5 @@ public <T extends Model> Map<String, Object> getSchemaAndPhases(Process<T> proce
new Policy<?>[] { policy },
JsonSchemaUtil.load(Thread.currentThread().getContextClassLoader(), process.id(), taskName));
}

}

0 comments on commit 76f8e41

Please sign in to comment.