Skip to content

Commit

Permalink
fix mongodb debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Oct 10, 2023
1 parent 877d88c commit 1149ae6
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 633 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.events.mongodb.codec.EventMongoDBCodecProvider;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
Expand All @@ -46,8 +46,8 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

static final String ID = "_id";

private MongoCollection<ProcessInstanceStateDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceStateDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;

protected abstract MongoClient mongoClient();

Expand All @@ -67,19 +67,19 @@ protected void configure() {
CodecRegistry registry = CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), fromProviders(new EventMongoDBCodecProvider(),
PojoCodecProvider.builder().automatic(true).build()));
MongoDatabase mongoDatabase = mongoClient().getDatabase(eventsDatabaseName()).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceStateDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceStateDataEvent.class).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
if (this.processInstancesEvents() && event instanceof ProcessInstanceStateDataEvent) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceStateDataEvent) event);
if (this.processInstancesEvents() && event instanceof ProcessInstanceDataEvent) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event);
return;
}

if (this.userTasksEvents() && event instanceof UserTaskInstanceStateDataEvent) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceStateDataEvent) event);
if (this.userTasksEvents() && event instanceof UserTaskInstanceDataEvent) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,23 @@
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;

public class EventMongoDBCodecProvider implements CodecProvider {

private static final ProcessInstanceDataEventCodec PROCESS_INSTANCE_DATA_EVENT_CODEC = new ProcessInstanceDataEventCodec();
private static final UserTaskInstanceDataEventCodec USER_TASK_INSTANCE_DATA_EVENT_CODEC = new UserTaskInstanceDataEventCodec();
private static final VariableInstanceDataEventCodec VARIABLE_INSTANCE_DATA_EVENT_CODEC = new VariableInstanceDataEventCodec();

@SuppressWarnings("unchecked")
@Override
public <T> Codec<T> get(Class<T> aClass, CodecRegistry codecRegistry) {
if (aClass == ProcessInstanceStateDataEvent.class) {
if (aClass.isAssignableFrom(ProcessInstanceDataEvent.class)) {
return (Codec<T>) PROCESS_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == UserTaskInstanceStateDataEvent.class) {
if (aClass.isAssignableFrom(UserTaskInstanceDataEvent.class)) {
return (Codec<T>) USER_TASK_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == ProcessInstanceVariableDataEvent.class) {
return (Codec<T>) VARIABLE_INSTANCE_DATA_EVENT_CODEC;
}
return null;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,73 +26,59 @@
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceStateDataEvent> {
public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceDataEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceDataEventCodec.class);

@Override
public ProcessInstanceStateDataEvent generateIdIfAbsentFromDocument(ProcessInstanceStateDataEvent processInstanceDataEvent) {
public ProcessInstanceDataEvent generateIdIfAbsentFromDocument(ProcessInstanceDataEvent processInstanceDataEvent) {
return processInstanceDataEvent;
}

@Override
public boolean documentHasId(ProcessInstanceStateDataEvent processInstanceDataEvent) {
public boolean documentHasId(ProcessInstanceDataEvent processInstanceDataEvent) {
return processInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(ProcessInstanceStateDataEvent processInstanceDataEvent) {
public BsonValue getDocumentId(ProcessInstanceDataEvent processInstanceDataEvent) {
return new BsonString(processInstanceDataEvent.getId());
}

@Override
public ProcessInstanceStateDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
public ProcessInstanceDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceStateDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, processInstanceDataEvent);
doc.put("kogitoProcessType", processInstanceDataEvent.getKogitoProcessType());
doc.put("kogitoProcessInstanceVersion", processInstanceDataEvent.getKogitoProcessInstanceVersion());
doc.put("kogitoParentProcessinstanceId", processInstanceDataEvent.getKogitoParentProcessInstanceId());
doc.put("kogitoProcessinstanceState", processInstanceDataEvent.getKogitoProcessInstanceState());
doc.put("kogitoReferenceId", processInstanceDataEvent.getKogitoReferenceId());
doc.put("kogitoStartFromNode", processInstanceDataEvent.getKogitoStartFromNode());
doc.put("kogitoIdentity", processInstanceDataEvent.getKogitoIdentity());
doc.put("data", encodeData(processInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceStateEventBody data) {
Document doc = new Document();
doc.put("id", data.getProcessInstanceId());
doc.put("version", data.getProcessVersion());
doc.put("parentInstanceId", data.getParentInstanceId());
doc.put("rootInstanceId", data.getRootProcessInstanceId());
doc.put("processId", data.getProcessId());
doc.put("processType", data.getProcessType());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("processName", data.getProcessName());
doc.put("eventDate", data.getEventDate());
doc.put("state", data.getState());
doc.put("businessKey", data.getBusinessKey());
doc.put("roles", data.getRoles());
doc.put("identity", data.getEventUser());

return doc;
public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Document document = Document.parse(mapper.writeValueAsString(processInstanceDataEvent));
document.put(CodecUtils.ID, processInstanceDataEvent.getId());
codec().encode(bsonWriter, document, encoderContext);
} catch (JsonProcessingException e) {
LOGGER.error("Could not process json event", e);
}
}

@Override
public Class<ProcessInstanceStateDataEvent> getEncoderClass() {
return ProcessInstanceStateDataEvent.class;
public Class<ProcessInstanceDataEvent> getEncoderClass() {
return ProcessInstanceDataEvent.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,64 +26,59 @@
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class UserTaskInstanceDataEventCodec implements CollectibleCodec<UserTaskInstanceStateDataEvent> {
public class UserTaskInstanceDataEventCodec implements CollectibleCodec<UserTaskInstanceDataEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(UserTaskInstanceDataEventCodec.class);

@Override
public UserTaskInstanceStateDataEvent generateIdIfAbsentFromDocument(UserTaskInstanceStateDataEvent userTaskInstanceDataEvent) {
public UserTaskInstanceDataEvent<?> generateIdIfAbsentFromDocument(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
return userTaskInstanceDataEvent;
}

@Override
public boolean documentHasId(UserTaskInstanceStateDataEvent userTaskInstanceDataEvent) {
public boolean documentHasId(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
return userTaskInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(UserTaskInstanceStateDataEvent userTaskInstanceDataEvent) {
public BsonValue getDocumentId(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
return new BsonString(userTaskInstanceDataEvent.getId());
}

@Override
public UserTaskInstanceStateDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
public UserTaskInstanceDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, UserTaskInstanceStateDataEvent userTaskInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, userTaskInstanceDataEvent);
doc.put("kogitoUserTaskinstanceId", userTaskInstanceDataEvent.getKogitoUserTaskInstanceId());
doc.put("kogitoUserTaskinstanceState", userTaskInstanceDataEvent.getKogitoUserTaskInstanceState());
doc.put("data", encodeData(userTaskInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(UserTaskInstanceStateEventBody data) {
Document doc = new Document();
doc.put("id", data.getUserTaskInstanceId());
doc.put("taskName", data.getUserTaskName());
doc.put("taskDescription", data.getUserTaskDescription());
doc.put("taskPriority", data.getUserTaskPriority());
doc.put("referenceName", data.getUserTaskReferenceName());
doc.put("eventDate", data.getEventDate());
doc.put("state", data.getState());
doc.put("actualOwner", data.getActualOwner());
doc.put("processInstanceId", data.getProcessInstanceId());
doc.put("identity", data.getEventUser());
return doc;
public void encode(BsonWriter bsonWriter, UserTaskInstanceDataEvent userTaskInstanceDataEvent, EncoderContext encoderContext) {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Document document = Document.parse(mapper.writeValueAsString(userTaskInstanceDataEvent));
document.put(CodecUtils.ID, userTaskInstanceDataEvent.getId());
codec().encode(bsonWriter, document, encoderContext);
} catch (JsonProcessingException e) {
LOGGER.error("Could not process json event", e);
}
}

@Override
public Class<UserTaskInstanceStateDataEvent> getEncoderClass() {
return UserTaskInstanceStateDataEvent.class;
public Class<UserTaskInstanceDataEvent> getEncoderClass() {
return UserTaskInstanceDataEvent.class;
}
}
Loading

0 comments on commit 1149ae6

Please sign in to comment.