Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kie-issues-249] Data index improvements #3241

Merged
merged 5 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package org.kie.kogito.events.mongodb;

import java.util.Collection;
import java.util.function.BooleanSupplier;

import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.events.mongodb.codec.EventMongoDBCodecProvider;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
Expand All @@ -50,7 +48,6 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<VariableInstanceDataEvent> variableInstanceDataEventCollection;

protected abstract MongoClient mongoClient();

Expand All @@ -60,54 +57,47 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

protected abstract boolean userTasksEvents();

protected abstract boolean variablesEvents();

protected abstract String eventsDatabaseName();

protected abstract String processInstancesEventsCollection();

protected abstract String userTasksEventsCollection();

protected abstract String variablesEventsCollection();

protected void configure() {
CodecRegistry registry = CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), fromProviders(new EventMongoDBCodecProvider(),
PojoCodecProvider.builder().automatic(true).build()));
MongoDatabase mongoDatabase = mongoClient().getDatabase(eventsDatabaseName()).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), VariableInstanceDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event, this::processInstancesEvents);
break;
case "UserTaskInstanceEvent":
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event, this::userTasksEvents);
break;
case "VariableInstanceEvent":
publishEvent(variableInstanceDataEventCollection, (VariableInstanceDataEvent) event, this::variablesEvents);
break;
default:
logger.warn("Unknown type of event '{}', ignoring", event.getType());
if (this.processInstancesEvents() && event instanceof ProcessInstanceDataEvent) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event);
return;
}

if (this.userTasksEvents() && event instanceof UserTaskInstanceDataEvent) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event);
return;
}

logger.debug("Unknown type of event '{}', ignoring", event.getType());

}

private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event, BooleanSupplier enabled) {
if (enabled.getAsBoolean()) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}
private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,22 @@
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;

public class EventMongoDBCodecProvider implements CodecProvider {

private static final ProcessInstanceDataEventCodec PROCESS_INSTANCE_DATA_EVENT_CODEC = new ProcessInstanceDataEventCodec();
private static final UserTaskInstanceDataEventCodec USER_TASK_INSTANCE_DATA_EVENT_CODEC = new UserTaskInstanceDataEventCodec();
private static final VariableInstanceDataEventCodec VARIABLE_INSTANCE_DATA_EVENT_CODEC = new VariableInstanceDataEventCodec();

@SuppressWarnings("unchecked")
@Override
public <T> Codec<T> get(Class<T> aClass, CodecRegistry codecRegistry) {
if (aClass == ProcessInstanceDataEvent.class) {
if (ProcessInstanceDataEvent.class.isAssignableFrom(aClass)) {
return (Codec<T>) PROCESS_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == UserTaskInstanceDataEvent.class) {
if (UserTaskInstanceDataEvent.class.isAssignableFrom(aClass)) {
return (Codec<T>) USER_TASK_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == VariableInstanceDataEvent.class) {
return (Codec<T>) VARIABLE_INSTANCE_DATA_EVENT_CODEC;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.events.mongodb.codec;

import java.util.stream.Collectors;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -29,13 +27,19 @@
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceEventBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceDataEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceDataEventCodec.class);

@Override
public ProcessInstanceDataEvent generateIdIfAbsentFromDocument(ProcessInstanceDataEvent processInstanceDataEvent) {
return processInstanceDataEvent;
Expand All @@ -61,76 +65,16 @@ public ProcessInstanceDataEvent decode(BsonReader bsonReader, DecoderContext dec

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, processInstanceDataEvent);
doc.put("kogitoProcessType", processInstanceDataEvent.getKogitoProcessType());
doc.put("kogitoProcessInstanceVersion", processInstanceDataEvent.getKogitoProcessInstanceVersion());
doc.put("kogitoParentProcessinstanceId", processInstanceDataEvent.getKogitoParentProcessInstanceId());
doc.put("kogitoProcessinstanceState", processInstanceDataEvent.getKogitoProcessInstanceState());
doc.put("kogitoReferenceId", processInstanceDataEvent.getKogitoReferenceId());
doc.put("kogitoStartFromNode", processInstanceDataEvent.getKogitoStartFromNode());
doc.put("kogitoIdentity", processInstanceDataEvent.getKogitoIdentity());
doc.put("data", encodeData(processInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceEventBody data) {
Document doc = new Document();
doc.put("id", data.getId());
doc.put("version", data.getVersion());
doc.put("parentInstanceId", data.getParentInstanceId());
doc.put("rootInstanceId", data.getRootInstanceId());
doc.put("processId", data.getProcessId());
doc.put("processType", data.getProcessType());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("processName", data.getProcessName());
doc.put("startDate", data.getStartDate());
doc.put("endDate", data.getEndDate());
doc.put("state", data.getState());
doc.put("businessKey", data.getBusinessKey());
doc.put("roles", data.getRoles());
doc.put("identity", data.getIdentity());

if (data.getVariables() != null) {
doc.put("variables", new Document(data.getVariables()));
try {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Document document = Document.parse(mapper.writeValueAsString(processInstanceDataEvent));
document.put(CodecUtils.ID, processInstanceDataEvent.getId());
codec().encode(bsonWriter, document, encoderContext);
} catch (JsonProcessingException e) {
LOGGER.error("Could not process json event", e);
}

if (data.getNodeInstances() != null) {
doc.put("nodeInstances",
data.getNodeInstances().stream().map(ni -> {
Document niDoc = new Document();
niDoc.put("id", ni.getId());
niDoc.put("nodeId", ni.getNodeId());
niDoc.put("nodeDefinitionId", ni.getNodeDefinitionId());
niDoc.put("nodeName", ni.getNodeName());
niDoc.put("nodeType", ni.getNodeType());
niDoc.put("triggerTime", ni.getTriggerTime());
if (ni.getLeaveTime() != null) {
niDoc.put("leaveTime", ni.getLeaveTime());
}
return niDoc;
}).collect(Collectors.toSet()));
}

if (data.getError() != null) {
Document eDoc = new Document();
eDoc.put("errorMessage", data.getError().getErrorMessage());
eDoc.put("nodeDefinitionId", data.getError().getNodeDefinitionId());
doc.put("error", eDoc);
}

if (data.getMilestones() != null) {
doc.put("milestones",
data.getMilestones().stream().map(m -> {
Document mDoc = new Document();
mDoc.put("id", m.getId());
mDoc.put("name", m.getName());
mDoc.put("status", m.getStatus());
return mDoc;
}).collect(Collectors.toSet()));
}

return doc;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.events.mongodb.codec;

import java.util.stream.Collectors;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -28,16 +26,22 @@
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class UserTaskInstanceDataEventCodec implements CollectibleCodec<UserTaskInstanceDataEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(UserTaskInstanceDataEventCodec.class);

@Override
public UserTaskInstanceDataEvent generateIdIfAbsentFromDocument(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
public UserTaskInstanceDataEvent<?> generateIdIfAbsentFromDocument(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
return userTaskInstanceDataEvent;
}

Expand All @@ -61,64 +65,16 @@ public UserTaskInstanceDataEvent decode(BsonReader bsonReader, DecoderContext de

@Override
public void encode(BsonWriter bsonWriter, UserTaskInstanceDataEvent userTaskInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, userTaskInstanceDataEvent);
doc.put("kogitoUserTaskinstanceId", userTaskInstanceDataEvent.getKogitoUserTaskinstanceId());
doc.put("kogitoUserTaskinstanceState", userTaskInstanceDataEvent.getKogitoUserTaskinstanceState());
doc.put("data", encodeData(userTaskInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(UserTaskInstanceEventBody data) {
Document doc = new Document();
doc.put("id", data.getId());
doc.put("taskName", data.getTaskName());
doc.put("taskDescription", data.getTaskDescription());
doc.put("taskPriority", data.getTaskPriority());
doc.put("referenceName", data.getReferenceName());
doc.put("startDate", data.getStartDate());
doc.put("completeDate", data.getCompleteDate());
doc.put("state", data.getState());
doc.put("actualOwner", data.getActualOwner());
doc.put("potentialUsers", data.getPotentialUsers());
doc.put("potentialGroups", data.getPotentialGroups());
doc.put("excludedUsers", data.getExcludedUsers());
doc.put("adminUsers", data.getAdminUsers());
doc.put("adminGroups", data.getAdminGroups());
doc.put("inputs", new Document(data.getInputs()));
doc.put("outputs", new Document(data.getOutputs()));
doc.put("processInstanceId", data.getProcessInstanceId());
doc.put("rootProcessInstanceId", data.getRootProcessInstanceId());
doc.put("processId", data.getProcessId());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("identity", data.getIdentity());

if (data.getComments() != null) {
doc.put("comments",
data.getComments().stream().map(c -> {
Document cDoc = new Document();
cDoc.put("id", c.getId());
cDoc.put("content", c.getContent());
cDoc.put("updatedAt", c.getUpdatedAt());
cDoc.put("updatedBy", c.getUpdatedBy());
return cDoc;
}).collect(Collectors.toSet()));
}

if (data.getAttachments() != null) {
doc.put("attachments",
data.getAttachments().stream().map(a -> {
Document aDoc = new Document();
aDoc.put("id", a.getId());
aDoc.put("content", a.getContent());
aDoc.put("updatedAt", a.getUpdatedAt());
aDoc.put("updatedBy", a.getUpdatedBy());
aDoc.put("name", a.getName());
return aDoc;
}).collect(Collectors.toSet()));
try {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Document document = Document.parse(mapper.writeValueAsString(userTaskInstanceDataEvent));
document.put(CodecUtils.ID, userTaskInstanceDataEvent.getId());
codec().encode(bsonWriter, document, encoderContext);
} catch (JsonProcessingException e) {
LOGGER.error("Could not process json event", e);
}

return doc;
}

@Override
Expand Down
Loading