Skip to content

Commit

Permalink
corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Mar 5, 2024
1 parent 3f8044b commit f22f2e8
Show file tree
Hide file tree
Showing 39 changed files with 486 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class FileSystemProcessInstances implements MutableProcessInstances {
private ProcessInstanceMarshallerService marshaller;

public FileSystemProcessInstances(Process<?> process, Path storage) {
this(process, storage, ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build());
this(process, storage, ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().withDefaultListeners().build());
}

public FileSystemProcessInstances(Process<?> process, Path storage, ProcessInstanceMarshallerService marshaller) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,37 @@ public boolean tryAdvance(Consumer<? super Record> action) {
private static String sqlIncludingVersion(String statement, String processVersion) {
return statement + " " + (processVersion == null ? PROCESS_VERSION_IS_NULL : PROCESS_VERSION_EQUALS_TO);
}

@Override
void migrate(String processId, String processVersion, String targetProcessId, String targetProcessVersion) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sqlIncludingVersion(Repository.MIGRATE_BULK, processVersion))) {
statement.setString(1, targetProcessId);
statement.setString(2, targetProcessVersion);
statement.setString(3, processId);
if (processVersion != null) {
statement.setString(4, processVersion);
}
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s-%s", processId, processVersion);
}
}

@Override
void migrate(String processId, String processVersion, String targetProcessId, String targetProcessVersion, String[] processIds) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sqlIncludingVersion(Repository.MIGRATE_INSTANCE, processVersion))) {
statement.setString(1, targetProcessId);
statement.setString(2, targetProcessVersion);
statement.setObject(3, connection.createArrayOf("VARCHAR", processIds));
statement.setString(4, processId);
if (processVersion != null) {
statement.setString(5, processVersion);
}
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s-%s", processId, processVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class JDBCProcessInstances implements MutableProcessInstances {
public JDBCProcessInstances(Process<?> process, DataSource dataSource, boolean lock) {
this.process = process;
this.lock = lock;
this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build();
this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().withDefaultListeners().build();
this.repository = new GenericRepository(dataSource);
}

Expand Down Expand Up @@ -88,6 +88,16 @@ public void update(String id, ProcessInstance instance) {
}
}

@Override
public void migrate(String targetProcessId, String targetProcessVersion) {
repository.migrate(process.id(), process.version(), targetProcessId, targetProcessVersion);
}

@Override
public void migrate(String targetProcessId, String targetProcessVersion, String... processIds) {
repository.migrate(process.id(), process.version(), targetProcessId, targetProcessVersion, processIds);
}

@Override
public void remove(String id) {
LOGGER.debug("Removing process instance id: {}, processId: {}", id, process.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ abstract class Repository {
static final String DELETE = "DELETE FROM process_instances WHERE process_id = ? and id = ?";
static final String PROCESS_VERSION_EQUALS_TO = "and process_version = ?";
static final String PROCESS_VERSION_IS_NULL = "and process_version is null";
static final String MIGRATE_BULK = "UPDATE process_instances SET process_id = ?, process_version = ? WHERE process_id = ? ";
static final String MIGRATE_INSTANCE = "UPDATE process_instances SET process_id = ?, process_version = ? WHERE id = ANY (?) and process_id = ? ";

static class Record {
private final byte[] payload;
Expand Down Expand Up @@ -66,4 +68,8 @@ public Record(byte[] payload, long version) {
protected RuntimeException uncheckedException(Exception ex, String message, Object... param) {
return new RuntimeException(String.format(message, param), ex);
}

abstract void migrate(String id, String version, String targetProcessId, String targetProcessVersion);

abstract void migrate(String id, String version, String targetProcessId, String targetProcessVersion, String[] processIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.kie.persistence.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Optional;

import javax.sql.DataSource;
Expand All @@ -39,6 +42,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.kie.kogito.internal.process.runtime.KogitoProcessInstance.STATE_ACTIVE;
import static org.kie.kogito.internal.process.runtime.KogitoProcessInstance.STATE_COMPLETED;
import static org.kie.kogito.test.utils.ProcessInstancesTestUtils.abort;
Expand All @@ -54,6 +58,8 @@ abstract class AbstractProcessInstancesIT {
public static final String TEST_ID = "02ac3854-46ee-42b7-8b63-5186c9889d96";
public static SecurityPolicy securityPolicy = SecurityPolicy.of(IdentityProviders.of("john"));

DataSource dataSource;

public static void initMigration(JdbcDatabaseContainer container, String dbKind) {
Flyway flyway = Flyway.configure().dataSource(container.getJdbcUrl(),
container.getUsername(),
Expand Down Expand Up @@ -210,6 +216,62 @@ public void testUpdate() {
assertEmpty(process.instances());
}

@Test
public void testMigrateAll() throws Exception {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance1 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance1.start();

ProcessInstance<BpmnVariables> processInstance2 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance2.start();

process.instances().migrate("migrated", "2");

DataSource dataSource = getDataSource();
try (Connection connection = dataSource.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery("SELECT process_id, process_version FROM process_instances")) {

while (resultSet.next()) {
assertEquals(resultSet.getString(1), "migrated");
assertEquals(resultSet.getString(2), "2");
}
}
}

@Test
public void testMigrateSingle() throws Exception {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance1 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance1.start();

ProcessInstance<BpmnVariables> processInstance2 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance2.start();

process.instances().migrate("migrated", "2", processInstance1.id());

DataSource dataSource = getDataSource();
try (Connection connection = dataSource.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery("SELECT process_id, process_version FROM process_instances")) {

while (resultSet.next()) {
assertEquals(resultSet.getString(1), "migrated");
assertEquals(resultSet.getString(2), "2");
}
}

try (Connection connection = dataSource.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery("SELECT process_id, process_version FROM process_instances WHERE id = " + processInstance2.id())) {

while (resultSet.next()) {
assertEquals(resultSet.getString(1), "BPMN2_UserTask");
assertEquals(resultSet.getString(2), "1.0");
}
}

}

@Test
public void testRemove() {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public MongoDBProcessInstances(MongoClient mongoClient, org.kie.kogito.process.P
this.collection = Objects.requireNonNull(getCollection(mongoClient, process.id(), dbName));
this.marshaller = ProcessInstanceMarshallerService.newBuilder()
.withDefaultObjectMarshallerStrategies()
.withDefaultListeners()
.withContextEntries(singletonMap(MarshallerContextName.MARSHALLER_FORMAT, MarshallerContextName.MARSHALLER_FORMAT_JSON))
.build();
this.transactionManager = Objects.requireNonNull(transactionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.naming.OperationNotSupportedException;

import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
Expand Down Expand Up @@ -57,8 +55,8 @@ public class PostgresqlProcessInstances implements MutableProcessInstances {
private static final String FIND_BY_ID = "SELECT payload, version FROM process_instances WHERE process_id = $1 and id = $2 and process_version ";
private static final String FIND_ALL = "SELECT payload, version FROM process_instances WHERE process_id = $1 and process_version ";
private static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET payload = $1, version = $2 WHERE process_id = $3 and id = $4 and version = $5 and process_version ";
private static final String MIGRATE_BULK = "UPDATE process_instances SET process_id = $1 and process_version = $2 WHERE process_id = $3 and process_version ";
private static final String MIGRATE_INSTANCE = "UPDATE process_instances SET process_id = $1 and process_version = $2 WHERE process_id = $3 and id = $4 and process_version ";
private static final String MIGRATE_BULK = "UPDATE process_instances SET process_id = $1, process_version = $2 WHERE process_id = $3 and process_version ";
private static final String MIGRATE_INSTANCE = "UPDATE process_instances SET process_id = $1, process_version = $2 WHERE process_id = $3 and id = ANY ($4) and process_version ";

private final Process<?> process;
private final PgPool client;
Expand All @@ -70,7 +68,7 @@ public PostgresqlProcessInstances(Process<?> process, PgPool client, Long queryT
this.process = process;
this.client = client;
this.queryTimeoutMillis = queryTimeoutMillis;
this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build();
this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().withDefaultListeners().build();
this.lock = lock;
}

Expand Down Expand Up @@ -167,7 +165,7 @@ private RuntimeException uncheckedException(Exception ex, String message, Object
}

@Override
public void migrate(String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException {
public void migrate(String targetProcessId, String targetProcessVersion) {
try {
Future<RowSet<Row>> future = null;
if (process.version() == null) {
Expand All @@ -185,13 +183,13 @@ public void migrate(String targetProcessId, String targetProcessVersion) throws
}

@Override
public void migrate(String targetProcessId, String targetProcessVersion, String[] processIds) throws OperationNotSupportedException {
public void migrate(String targetProcessId, String targetProcessVersion, String... processIds) {
try {
Future<RowSet<Row>> future = null;
if (process.version() == null) {
future = client.preparedQuery(MIGRATE_BULK + IS_NULL).execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id()));
future = client.preparedQuery(MIGRATE_INSTANCE + IS_NULL).execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id()));
} else {
future = client.preparedQuery(MIGRATE_BULK + "= $5").execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id(), process.version()));
future = client.preparedQuery(MIGRATE_INSTANCE + "= $5").execute(tuple(targetProcessId, targetProcessVersion, processIds, process.id(), process.version()));
}
getExecutedResult(future);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
import org.testcontainers.junit.jupiter.Testcontainers;

import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;

import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.kie.kogito.internal.process.runtime.KogitoProcessInstance.STATE_ACTIVE;
import static org.kie.kogito.internal.process.runtime.KogitoProcessInstance.STATE_COMPLETED;
import static org.kie.kogito.test.utils.ProcessInstancesTestUtils.abort;
Expand Down Expand Up @@ -198,6 +202,46 @@ public void testUpdate() {

}

@Test
public void testMigrateAll() throws Exception {
BpmnProcess process = createProcess("BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance1 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance1.start();

ProcessInstance<BpmnVariables> processInstance2 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance2.start();

process.instances().migrate("migrated", "2");
RowSet<Row> rows = client.preparedQuery("SELECT process_id, process_version FROM process_instances").execute().toCompletionStage().toCompletableFuture().get();
for (Row row : rows) {
assertEquals(row.get(String.class, 1), "migrated");
assertEquals(row.get(String.class, 2), "2");
}
}

@Test
public void testMigrateSingle() throws Exception {
BpmnProcess process = createProcess("BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance1 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance1.start();

ProcessInstance<BpmnVariables> processInstance2 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance2.start();

process.instances().migrate("migrated", "2", processInstance1.id());
RowSet<Row> rows = null;
rows = client.preparedQuery("SELECT process_id, process_version FROM process_instances WHERE id = $1").execute(Tuple.of(processInstance1.id())).toCompletionStage().toCompletableFuture().get();
for (Row row : rows) {
assertEquals(row.get(String.class, 1), "migrated");
assertEquals(row.get(String.class, 2), "2");
}
rows = client.preparedQuery("SELECT process_id, process_version FROM process_instances WHERE id = $1").execute(Tuple.of(processInstance2.id())).toCompletionStage().toCompletableFuture().get();
for (Row row : rows) {
assertEquals(row.get(String.class, 1), "BPMN2_UserTask");
assertEquals(row.get(String.class, 2), "1.0");
}
}

@Test
public void testRemove() {
BpmnProcess process = createProcess("BPMN2-UserTask.bpmn2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class RocksDBProcessInstances<T> implements MutableProcessInstances<T> {

public RocksDBProcessInstances(Process<T> process, RocksDB db) {
this.process = process;
marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build();
marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().withDefaultListeners().build();
this.db = db;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@
import org.jbpm.workflow.core.Node;
import org.jbpm.workflow.core.WorkflowProcess;
import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.auth.SecurityPolicy;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessError;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceExecutionException;
import org.kie.kogito.process.ProcessService;
import org.kie.kogito.process.Processes;
import org.kie.kogito.process.WorkItem;
import org.kie.kogito.process.impl.AbstractProcess;
import org.kie.kogito.services.uow.UnitOfWorkExecutor;

import com.fasterxml.jackson.databind.JsonNode;

import static java.util.Collections.singletonMap;

public abstract class BaseProcessInstanceManagementResource<T> implements ProcessInstanceManagement<T> {

private static final String PROCESS_REQUIRED = "Process id must be given";
Expand Down Expand Up @@ -127,6 +131,26 @@ public T doGetInstanceInError(String processId, String processInstanceId) {
});
}

public T doMigrateInstance(ProcessService processService, String processId, ProcessMigrationSpec migrationSpec, String processInstanceId) {
try {
Process<? extends Model> process = processes.get().processById(processId);
processService.migrate(process, migrationSpec.getTargetProcessId(), migrationSpec.getTargetProcessVersion(), processInstanceId);
return buildOkResponse(singletonMap("message", processInstanceId + " instance migrated"));
} catch (Exception e) {
return badRequestResponse(e.getMessage());
}
}

public T doMigrateAllInstances(ProcessService processService, String processId, ProcessMigrationSpec migrationSpec) {
try {
Process<? extends Model> process = processes.get().processById(processId);
processService.migrate(process, migrationSpec.getTargetProcessId(), migrationSpec.getTargetProcessVersion());
return buildOkResponse(singletonMap("message", "All intances migrated"));
} catch (Exception e) {
return badRequestResponse(e.getMessage());
}
}

public T doGetWorkItemsInProcessInstance(String processId, String processInstanceId) {

return executeOnProcessInstance(processId, processInstanceId, processInstance -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public interface ProcessInstanceManagement<T> {
T cancelNodeInstanceId(String processId, String processInstanceId, String nodeInstanceId);

T cancelProcessInstanceId(String processId, String processInstanceId);

T migrateAllInstances(String processId, ProcessMigrationSpec migrationSpec);

T migrateInstance(String processId, String processInstanceId, ProcessMigrationSpec migrationSpec);

}
Loading

0 comments on commit f22f2e8

Please sign in to comment.