Skip to content

Commit

Permalink
[incubator-kie-issues-916] add process event data state change
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Mar 1, 2024
1 parent 76f8e41 commit 953d96a
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ void fireAfterVariableChanged(String name, String id, Object oldValue, Object ne

void fireOnMessage(KogitoProcessInstance instance, KogitoNodeInstance nodeInstance, KieRuntime kruntime, String messageName, Object messageObject);

void fireOnMigration(KogitoProcessInstance processInstance, KieRuntime runtime);

// user tasks events

void fireOneUserTaskStateChange(
Expand Down Expand Up @@ -166,4 +168,4 @@ void fireOnUserTaskCommentAdded(

void removeEventListener(KogitoProcessEventListener listener);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ default Optional<ProcessInstance<T>> findById(String id) {

Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);


default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
}

default void migrate(String targetProcessId, String targetProcessVersion, String[] processIds) throws OperationNotSupportedException {
throw new OperationNotSupportedException();
Expand All @@ -49,5 +49,4 @@ default Stream<ProcessInstance<T>> stream() {
return stream(ProcessInstanceReadMode.READ_ONLY);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ <T extends Model> ProcessInstance<T> createProcessInstance(Process<T> process, S

<T extends MappableToModel<R>, R> Optional<R> findById(Process<T> process, String id);

<T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion, String ...id) throws OperationNotSupportedException;
<T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion, String... id) throws OperationNotSupportedException;

<T> void migrate(Process<T> process, String targetProcessId, String targetProcessVersion) throws OperationNotSupportedException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ProcessInstanceStateEventBody {

public static final int EVENT_TYPE_STARTED = 1;
public static final int EVENT_TYPE_ENDED = 2;
public static final int EVENT_TYPE_MIGRATED = 3;

// common fields for events
private Date eventDate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEvent;
import org.kie.api.event.process.ProcessMigrationEvent;
import org.kie.api.event.process.ProcessNodeEvent;
import org.kie.api.event.process.ProcessNodeLeftEvent;
import org.kie.api.event.process.ProcessNodeTriggeredEvent;
Expand Down Expand Up @@ -117,6 +118,8 @@ private void addDataEvent(ProcessEvent event) {
handleProcesssNodeEvent((SLAViolatedEvent) event);
} else if (event instanceof ProcessVariableChangedEvent) {
handleProcessVariableEvent((ProcessVariableChangedEvent) event);
} else if (event instanceof ProcessMigrationEvent) {
handleProcessStateEvent((ProcessMigrationEvent) event);
}
}

Expand Down Expand Up @@ -293,6 +296,10 @@ private void handleProcessStateEvent(ProcessStartedEvent event) {

}

private void handleProcessStateEvent(ProcessMigrationEvent event) {
processedEvents.add(toProcessInstanceStateEvent(event, ProcessInstanceStateEventBody.EVENT_TYPE_MIGRATED));
}

private ProcessInstanceStateDataEvent toProcessInstanceStateEvent(ProcessEvent event, int eventType) {
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.jbpm.flow.migration.MigrationPlanService;
import org.jbpm.flow.serialization.ProcessInstanceMarshallerListener;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,13 +39,14 @@ public StandardMigrationProcessInstanceMarshallerListener() {
}

@Override
public void afterUnmarshallProcess(KogitoWorkflowProcessInstance processInstance) {
public void afterUnmarshallProcess(KogitoProcessRuntime runtime, KogitoWorkflowProcessInstance processInstance) {
LOGGER.debug("Migration processInstance {}", processInstance);
migrationPlanService.migrateProcessElement(processInstance);
runtime.getProcessEventSupport().fireOnMigration(processInstance, runtime.getKieRuntime());
}

@Override
public void afterUnmarshallNode(KogitoNodeInstance nodeInstance) {
public void afterUnmarshallNode(KogitoProcessRuntime runtime, KogitoNodeInstance nodeInstance) {
LOGGER.debug("Migration nodeInstance {}", nodeInstance);
migrationPlanService.migrateNodeElement(nodeInstance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ public void fireOnMessage(final KogitoProcessInstance instance, KogitoNodeInstan
notifyAllListeners(l -> l.onMessage(event));
}

@Override
public void fireOnMigration(final KogitoProcessInstance processInstance, KieRuntime kruntime) {
ProcessMigrationEventImpl event = new ProcessMigrationEventImpl(processInstance, kruntime, identityProvider.getName());
notifyAllListeners(l -> l.onMigration(event));
}

// users tasks events
@Override
public void fireOnUserTaskNotStartedDeadline(KogitoProcessInstance instance,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jbpm.process.instance.event;

import org.kie.api.event.process.ProcessMigrationEvent;
import org.kie.api.runtime.KieRuntime;
import org.kie.api.runtime.process.ProcessInstance;

public class ProcessMigrationEventImpl extends ProcessEvent implements ProcessMigrationEvent {

private static final long serialVersionUID = 510l;

public ProcessMigrationEventImpl(ProcessInstance instance, KieRuntime kruntime, String identity) {
super(instance, kruntime, identity);
}

@Override
public String toString() {
return "==>[ProcessMigratedEventImpl(name=" + getProcessInstance().getProcessName() + "; id=" + getProcessInstance().getProcessId() + ")]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ protected void registerListeners() {

}

public KogitoProcessRuntime getProcessRuntime() {
return this.processRuntime;
}

@Override
public void activate() {
if (this.activated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@
package org.jbpm.flow.serialization;

import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;

public interface ProcessInstanceMarshallerListener {

default void afterUnmarshallProcess(KogitoWorkflowProcessInstance processInstance) {
default void afterUnmarshallProcess(KogitoProcessRuntime runtime, KogitoWorkflowProcessInstance processInstance) {

}

default void afterUnmarshallNode(KogitoNodeInstance node) {
default void afterUnmarshallNode(KogitoProcessRuntime runtime, KogitoNodeInstance node) {

}

default void beforeMarshallNode(KogitoNodeInstance node) {
default void beforeMarshallNode(KogitoProcessRuntime runtime, KogitoNodeInstance node) {

}

default void beforeMarshallProcess(KogitoWorkflowProcessInstance node) {
default void beforeMarshallProcess(KogitoProcessRuntime runtime, KogitoWorkflowProcessInstance node) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.kie.api.definition.process.WorkflowElementIdentifier;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstanceContainer;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.process.impl.AbstractProcess;
import org.kie.kogito.process.workitem.Attachment;
import org.kie.kogito.process.workitem.Comment;
Expand Down Expand Up @@ -229,7 +230,8 @@ private RuleFlowProcessInstance buildWorkflow(KogitoProcessInstanceProtobuf.Proc
if (workflowContext.getIterationLevelsCount() > 0) {
processInstance.getIterationLevels().putAll(buildIterationLevels(workflowContext.getIterationLevelsList()));
}
Arrays.stream(listeners).forEach(e -> e.afterUnmarshallProcess(processInstance));
KogitoProcessRuntime runtime = ((AbstractProcess<?>) context.get(MarshallerContextName.MARSHALLER_PROCESS)).getProcessRuntime();
Arrays.stream(listeners).forEach(e -> e.afterUnmarshallProcess(runtime, processInstance));
return processInstance;
}

Expand Down Expand Up @@ -317,7 +319,8 @@ protected NodeInstanceImpl buildNodeInstance(KogitoTypesProtobuf.NodeInstance no
}

KogitoNodeInstance kogitoNodeInstance = (KogitoNodeInstance) result;
Arrays.stream(listeners).forEach(e -> e.afterUnmarshallNode(kogitoNodeInstance));
KogitoProcessRuntime runtime = ((AbstractProcess<?>) context.get(MarshallerContextName.MARSHALLER_PROCESS)).getProcessRuntime();
Arrays.stream(listeners).forEach(e -> e.afterUnmarshallNode(runtime, kogitoNodeInstance));
return result;
} catch (IOException e) {
throw new IllegalArgumentException("Cannot read node instance content");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@
import org.kie.api.definition.process.WorkflowElementIdentifier;
import org.kie.api.runtime.process.NodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.internal.process.runtime.KogitoWorkItem;
import org.kie.kogito.process.impl.AbstractProcess;
import org.kie.kogito.process.workitem.Attachment;
import org.kie.kogito.process.workitem.Comment;
import org.kie.kogito.process.workitem.HumanTaskWorkItem;
Expand All @@ -109,7 +111,8 @@ public ProtobufProcessInstanceWriter(MarshallerWriterContext context) {
}

public void writeProcessInstance(WorkflowProcessInstanceImpl workFlow, OutputStream os) throws IOException {
Arrays.stream(listeners).forEach(e -> e.beforeMarshallProcess(workFlow));
KogitoProcessRuntime runtime = ((AbstractProcess<?>) context.get(MarshallerContextName.MARSHALLER_PROCESS)).getProcessRuntime();
Arrays.stream(listeners).forEach(e -> e.beforeMarshallProcess(runtime, workFlow));

KogitoProcessInstanceProtobuf.ProcessInstance.Builder instance = KogitoProcessInstanceProtobuf.ProcessInstance.newBuilder()
.setId(workFlow.getStringId())
Expand Down Expand Up @@ -248,7 +251,8 @@ private List<org.jbpm.flow.serialization.protobuf.KogitoTypesProtobuf.NodeInstan
}

private Any buildNodeInstanceContent(NodeInstance nodeInstance) {
Arrays.stream(listeners).forEach(e -> e.beforeMarshallNode((KogitoNodeInstance) nodeInstance));
KogitoProcessRuntime runtime = ((AbstractProcess<?>) context.get(MarshallerContextName.MARSHALLER_PROCESS)).getProcessRuntime();
Arrays.stream(listeners).forEach(e -> e.beforeMarshallNode(runtime, (KogitoNodeInstance) nodeInstance));

if (nodeInstance instanceof RuleSetNodeInstance) {
return buildRuleSetNodeInstance((RuleSetNodeInstance) nodeInstance);
Expand Down

0 comments on commit 953d96a

Please sign in to comment.