Skip to content

Commit

Permalink
[incubator-kie-issues-861] unify identifiers / part 1 (apache#3403)
Browse files Browse the repository at this point in the history
* [incubator-kie-issues-916] unify identifiers / part 1

* [incubator-kie-issues-916] PIM / migration plan impl

* [incubator-kie-issues-916] PIM jbpm migration module added

* [incubator-kie-issues-916] add marking logic

* [incubator-kie-issues-916] add process event data state change
  • Loading branch information
elguardian authored and rgdoliveira committed Apr 16, 2024
1 parent 6fbd0f8 commit 79a0ade
Show file tree
Hide file tree
Showing 302 changed files with 42,775 additions and 38,060 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceDuplicatedException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;

@SuppressWarnings({ "rawtypes" })
public class FileSystemProcessInstances implements MutableProcessInstances {
Expand All @@ -50,7 +50,7 @@ public class FileSystemProcessInstances implements MutableProcessInstances {
private ProcessInstanceMarshallerService marshaller;

public FileSystemProcessInstances(Process<?> process, Path storage) {
this(process, storage, ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build());
this(process, storage, ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().withDefaultListeners().build());
}

public FileSystemProcessInstances(Process<?> process, Path storage, ProcessInstanceMarshallerService marshaller) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.util.CloseableIterator;
import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
Expand All @@ -39,7 +40,6 @@
import org.kie.kogito.process.ProcessInstanceOptimisticLockingException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;

@SuppressWarnings({ "rawtypes" })
public class CacheProcessInstances implements MutableProcessInstances {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.bpmn2.BpmnProcess;
import org.kie.kogito.process.bpmn2.BpmnVariables;
Expand Down Expand Up @@ -93,6 +94,7 @@ public void testBasic() {
when(mockCreatePi.status()).thenReturn(ProcessInstance.STATE_ACTIVE);
when(mockCreatePi.internalGetProcessInstance()).thenReturn(createPi);
when(mockCreatePi.id()).thenReturn(TEST_ID);
when(mockCreatePi.process()).thenReturn((Process) process);
pi.create(TEST_ID, mockCreatePi);
assertOne(pi);
assertThat(pi.exists(TEST_ID)).isTrue();
Expand All @@ -104,6 +106,7 @@ public void testBasic() {
when(mockUpdatePi.status()).thenReturn(ProcessInstance.STATE_ACTIVE);
when(mockUpdatePi.internalGetProcessInstance()).thenReturn(updatePi);
when(mockUpdatePi.id()).thenReturn(TEST_ID);
when(mockUpdatePi.process()).thenReturn((Process) process);

try {
pi.update(TEST_ID, mockUpdatePi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,37 @@ public boolean tryAdvance(Consumer<? super Record> action) {
private static String sqlIncludingVersion(String statement, String processVersion) {
return statement + " " + (processVersion == null ? PROCESS_VERSION_IS_NULL : PROCESS_VERSION_EQUALS_TO);
}

@Override
long migrate(String processId, String processVersion, String targetProcessId, String targetProcessVersion) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sqlIncludingVersion(Repository.MIGRATE_BULK, processVersion))) {
statement.setString(1, targetProcessId);
statement.setString(2, targetProcessVersion);
statement.setString(3, processId);
if (processVersion != null) {
statement.setString(4, processVersion);
}
return statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s-%s", processId, processVersion);
}
}

@Override
void migrate(String processId, String processVersion, String targetProcessId, String targetProcessVersion, String[] processIds) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sqlIncludingVersion(Repository.MIGRATE_INSTANCE, processVersion))) {
statement.setString(1, targetProcessId);
statement.setString(2, targetProcessVersion);
statement.setObject(3, connection.createArrayOf("VARCHAR", processIds));
statement.setString(4, processId);
if (processVersion != null) {
statement.setString(5, processVersion);
}
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s-%s", processId, processVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

import javax.sql.DataSource;

import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceOptimisticLockingException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +46,7 @@ public class JDBCProcessInstances implements MutableProcessInstances {
public JDBCProcessInstances(Process<?> process, DataSource dataSource, boolean lock) {
this.process = process;
this.lock = lock;
this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build();
this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().withDefaultListeners().build();
this.repository = new GenericRepository(dataSource);
}

Expand Down Expand Up @@ -88,6 +88,16 @@ public void update(String id, ProcessInstance instance) {
}
}

@Override
public long migrateAll(String targetProcessId, String targetProcessVersion) {
return repository.migrate(process.id(), process.version(), targetProcessId, targetProcessVersion);
}

@Override
public void migrateProcessInstances(String targetProcessId, String targetProcessVersion, String... processIds) {
repository.migrate(process.id(), process.version(), targetProcessId, targetProcessVersion, processIds);
}

@Override
public void remove(String id) {
LOGGER.debug("Removing process instance id: {}, processId: {}", id, process.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ abstract class Repository {
static final String DELETE = "DELETE FROM process_instances WHERE process_id = ? and id = ?";
static final String PROCESS_VERSION_EQUALS_TO = "and process_version = ?";
static final String PROCESS_VERSION_IS_NULL = "and process_version is null";
static final String MIGRATE_BULK = "UPDATE process_instances SET process_id = ?, process_version = ? WHERE process_id = ? ";
static final String MIGRATE_INSTANCE = "UPDATE process_instances SET process_id = ?, process_version = ? WHERE id = ANY (?) and process_id = ? ";

static class Record {
private final byte[] payload;
Expand Down Expand Up @@ -70,4 +72,8 @@ public Record(byte[] payload, long version) {
protected RuntimeException uncheckedException(Exception ex, String message, Object... param) {
return new RuntimeException(String.format(message, param), ex);
}

abstract long migrate(String id, String version, String targetProcessId, String targetProcessVersion);

abstract void migrate(String id, String version, String targetProcessId, String targetProcessVersion, String[] processIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.kie.persistence.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Optional;

import javax.sql.DataSource;
Expand All @@ -39,6 +42,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.kie.kogito.internal.process.runtime.KogitoProcessInstance.STATE_ACTIVE;
import static org.kie.kogito.internal.process.runtime.KogitoProcessInstance.STATE_COMPLETED;
import static org.kie.kogito.test.utils.ProcessInstancesTestUtils.abort;
Expand All @@ -54,6 +58,8 @@ abstract class AbstractProcessInstancesIT {
public static final String TEST_ID = "02ac3854-46ee-42b7-8b63-5186c9889d96";
public static SecurityPolicy securityPolicy = SecurityPolicy.of(IdentityProviders.of("john"));

DataSource dataSource;

public static void initMigration(JdbcDatabaseContainer container, String dbKind) {
Flyway flyway = Flyway.configure().dataSource(container.getJdbcUrl(),
container.getUsername(),
Expand Down Expand Up @@ -211,6 +217,62 @@ public void testUpdate() {
assertEmpty(process.instances());
}

@Test
public void testMigrateAll() throws Exception {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance1 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance1.start();

ProcessInstance<BpmnVariables> processInstance2 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance2.start();

process.instances().migrateAll("migrated", "2");

DataSource dataSource = getDataSource();
try (Connection connection = dataSource.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery("SELECT process_id, process_version FROM process_instances")) {

while (resultSet.next()) {
assertEquals(resultSet.getString(1), "migrated");
assertEquals(resultSet.getString(2), "2");
}
}
}

@Test
public void testMigrateSingle() throws Exception {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance1 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance1.start();

ProcessInstance<BpmnVariables> processInstance2 = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
processInstance2.start();

process.instances().migrateProcessInstances("migrated", "2", processInstance1.id());

DataSource dataSource = getDataSource();
try (Connection connection = dataSource.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery("SELECT process_id, process_version FROM process_instances WHERE id = '" + processInstance1.id() + "'")) {

while (resultSet.next()) {
assertEquals(resultSet.getString(1), "migrated");
assertEquals(resultSet.getString(2), "2");
}
}

try (Connection connection = dataSource.getConnection();
ResultSet resultSet = connection.createStatement().executeQuery("SELECT process_id, process_version FROM process_instances WHERE id = '" + processInstance2.id() + "'")) {

while (resultSet.next()) {
assertEquals(resultSet.getString(1), "BPMN2_UserTask");
assertEquals(resultSet.getString(2), "1.0");
}
}

}

@Test
public void testRemove() {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.jbpm.flow.serialization.MarshallerContextName;
import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.Model;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
import org.kie.kogito.process.MutableProcessInstances;
Expand All @@ -37,8 +39,6 @@
import org.kie.kogito.process.ProcessInstanceOptimisticLockingException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.serialization.process.MarshallerContextName;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.ClientSession;
Expand Down Expand Up @@ -69,6 +69,7 @@ public MongoDBProcessInstances(MongoClient mongoClient, org.kie.kogito.process.P
this.collection = Objects.requireNonNull(getCollection(mongoClient, process.id(), dbName));
this.marshaller = ProcessInstanceMarshallerService.newBuilder()
.withDefaultObjectMarshallerStrategies()
.withDefaultListeners()
.withContextEntries(singletonMap(MarshallerContextName.MARSHALLER_FORMAT, MarshallerContextName.MARSHALLER_FORMAT_JSON))
.build();
this.transactionManager = Objects.requireNonNull(transactionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

import org.bson.Document;
import org.drools.io.ClassPathResource;
import org.jbpm.flow.serialization.MarshallerContextName;
import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.bpmn2.BpmnProcess;
import org.kie.kogito.process.bpmn2.BpmnVariables;
import org.kie.kogito.serialization.process.MarshallerContextName;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;

import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ void testMongoDBPersistenceWithTransaction() {
AbstractProcessInstance mockUpdateProcessInstance = mock(AbstractProcessInstance.class);
when(mockUpdateProcessInstance.status()).thenReturn(ProcessInstance.STATE_ACTIVE);
when(mockUpdateProcessInstance.internalGetProcessInstance()).thenReturn(updatePi);
when(mockUpdateProcessInstance.process()).thenReturn(process);

mongodbInstance.update(id, mockUpdateProcessInstance);
verify(mongoCollection, times(1)).replaceOne(eq(clientSession), eq(Filters.eq(PROCESS_INSTANCE_ID, id)), any());
Expand All @@ -154,6 +155,7 @@ void testMongoDBPersistenceWithTransaction() {
AbstractProcessInstance mockCreateProcessInstance = mock(AbstractProcessInstance.class);
when(mockCreateProcessInstance.status()).thenReturn(ProcessInstance.STATE_ACTIVE);
when(mockCreateProcessInstance.internalGetProcessInstance()).thenReturn(createPi);
when(mockCreateProcessInstance.process()).thenReturn(process);

mongodbInstance.create(id, mockCreateProcessInstance);
verify(mongoCollection, times(1)).insertOne(eq(clientSession), any());
Expand Down
Loading

0 comments on commit 79a0ade

Please sign in to comment.