Skip to content

Commit

Permalink
[Fix #3413] Speed up business key query performance
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Mar 14, 2024
1 parent f04c2a3 commit c358036
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,35 @@

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenericRepository extends Repository {

private static final String PAYLOAD = "payload";
private static final String VERSION = "version";

private static final Logger LOGGER = LoggerFactory.getLogger(GenericRepository.class);

private final DataSource dataSource;

public GenericRepository(DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
void insertInternal(String processId, String processVersion, UUID id, byte[] payload) {
void insertInternal(String processId, String processVersion, UUID id, byte[] payload, String businessKey) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(INSERT)) {
statement.setString(1, id.toString());
String processInstanceId = id.toString();
statement.setString(1, processInstanceId);
statement.setBytes(2, payload);
statement.setString(3, processId);
statement.setString(4, processVersion);
statement.setLong(5, 0L);
statement.executeUpdate();
if (businessKey != null) {
try (PreparedStatement businessKeyStmt = connection.prepareStatement(INSERT_BUSINESS_KEY)) {
businessKeyStmt.setString(1, businessKey);
businessKeyStmt.setString(2, processInstanceId);
businessKeyStmt.executeUpdate();
}
}
} catch (Exception e) {
throw uncheckedException(e, "Error inserting process instance %s", id);
}
Expand Down Expand Up @@ -140,6 +143,19 @@ Optional<Record> findByIdInternal(String processId, String processVersion, UUID
return Optional.empty();
}

@Override
Optional<Record> findByBusinessKey(String processId, String processVersion, String businessKey) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(FIND_BY_BUSINESS_KEY)) {
statement.setString(1, businessKey);
try (ResultSet resultSet = statement.executeQuery()) {
return resultSet.next() ? Optional.of(from(resultSet)) : Optional.empty();
}
} catch (Exception e) {
throw uncheckedException(e, "Error finding process instance using business key %s", businessKey);
}
}

private static class CloseableWrapper implements Runnable {

private Deque<AutoCloseable> wrapped = new ArrayDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public boolean exists(String id) {
public void create(String id, ProcessInstance instance) {
LOGGER.debug("Creating process instance id: {}, processId: {}, processVersion: {}", id, process.id(), process.version());
if (isActive(instance)) {
repository.insertInternal(process.id(), process.version(), UUID.fromString(id), marshaller.marshallProcessInstance(instance));
repository.insertInternal(process.id(), process.version(), UUID.fromString(id), marshaller.marshallProcessInstance(instance), instance.businessKey());
} else {
LOGGER.warn("Skipping create of process instance id: {}, state: {}", id, instance.status());
}
Expand Down Expand Up @@ -101,6 +101,12 @@ public Optional<ProcessInstance<?>> findById(String id, ProcessInstanceReadMode
return repository.findByIdInternal(process.id(), process.version(), UUID.fromString(id)).map(r -> unmarshall(r, mode));
}

@Override
public Optional<ProcessInstance<?>> findByBusinessKey(String businessKey, ProcessInstanceReadMode mode) {
LOGGER.debug("Find process instance using business Key : {}", businessKey);
return repository.findByBusinessKey(process.id(), process.version(), businessKey).map(r -> unmarshall(r, mode));
}

@Override
public Stream<ProcessInstance<?>> stream(ProcessInstanceReadMode mode) {
LOGGER.debug("Find process instance values using mode: {}", mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
abstract class Repository {

static final String INSERT = "INSERT INTO process_instances (id, payload, process_id, process_version, version) VALUES (?, ?, ?, ?, ?)";
static final String INSERT_BUSINESS_KEY = "INSERT INTO business_key_mapping (name,process_instance_id) VALUES (?,?)";
static final String FIND_ALL = "SELECT payload, version FROM process_instances WHERE process_id = ?";
static final String FIND_BY_ID = "SELECT payload, version FROM process_instances WHERE process_id = ? and id = ?";
static final String FIND_BY_BUSINESS_KEY = "SELECT payload, version FROM process_instances INNER JOIN business_key_mapping ON id = process_instance_id WHERE name = ?";
static final String UPDATE = "UPDATE process_instances SET payload = ? WHERE process_id = ? and id = ?";
static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET payload = ?, version = ? WHERE process_id = ? and id = ? and version = ?";
static final String DELETE = "DELETE FROM process_instances WHERE process_id = ? and id = ?";
Expand All @@ -51,7 +53,7 @@ public Record(byte[] payload, long version) {
}
}

abstract void insertInternal(String processId, String processVersion, UUID id, byte[] payload);
abstract void insertInternal(String processId, String processVersion, UUID id, byte[] payload, String businessKey);

abstract void updateInternal(String processId, String processVersion, UUID id, byte[] payload);

Expand All @@ -61,6 +63,8 @@ public Record(byte[] payload, long version) {

abstract Optional<Record> findByIdInternal(String processId, String processVersion, UUID id);

abstract Optional<Record> findByBusinessKey(String processId, String processVersion, String businessKey);

abstract Stream<Record> findAllInternal(String processId, String processVersion);

protected RuntimeException uncheckedException(Exception ex, String message, Object... param) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE business_key_mapping (
name VARCHAR (255) NOT NULL,
process_instance_id VARCHAR (36) NOT NULL,
CONSTRAINT business_key_primary_key PRIMARY KEY (name),
CONSTRAINT fk_process_instances
FOREIGN KEY (process_instance_id)
REFERENCES process_instances(id)
ON DELETE CASCADE
);

CREATE INDEX idx_business_key_process_instance_id ON business_key_mapping (process_instance_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE business_key_mapping (
name character (255) NOT NULL,
process_instance_id character (36) NOT NULL,
PRIMARY KEY (name),
CONSTRAINT fk_process_instances
FOREIGN KEY (process_instance_id)
REFERENCES process_instances(id)
ON DELETE CASCADE
);


CREATE INDEX idx_business_key_process_instance_id ON business_key_mapping (process_instance_id);
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,14 @@ void testMultipleProcesses() {
@Test
void testBasicFlow() {
var factory = new TestProcessInstancesFactory(getDataSource(), lock());
final String businessKey = "manolo";
BpmnProcess process = createProcess(factory, "BPMN2-UserTask.bpmn2");
ProcessInstance<BpmnVariables> processInstance = process.createInstance(BpmnVariables.create(singletonMap("test",
ProcessInstance<BpmnVariables> processInstance = process.createInstance(businessKey, BpmnVariables.create(singletonMap("test",
"test")));
processInstance.start();

JDBCProcessInstances processInstances = (JDBCProcessInstances) process.instances();
Optional<?> foundOne = processInstances.findById(processInstance.id());
Optional<?> foundOne = processInstances.findByBusinessKey(businessKey);
BpmnProcessInstance instanceOne = (BpmnProcessInstance) foundOne.get();
processInstances.update(processInstance.id(), instanceOne);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ default Optional<ProcessInstance<T>> findById(String id) {
Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);

default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
return findByBusinessKey(id, ProcessInstanceReadMode.READ_ONLY);
}

default Optional<ProcessInstance<T>> findByBusinessKey(String id, ProcessInstanceReadMode mode) {
return stream(mode).filter(pi -> id.equals(pi.businessKey())).findAny();
}

Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);
Expand Down

0 comments on commit c358036

Please sign in to comment.