Skip to content

Commit

Permalink
Add logging MDC
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanpelikan committed Aug 27, 2024
1 parent 195d677 commit eff63a6
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 34 deletions.
8 changes: 8 additions & 0 deletions spring-boot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ To run Camunda 8 on your local computer for development purposes consider to use
1. [Module aware deployment](#module-aware-deployment)
1. [SPI Binding validation](#spi-binding-validation)
1. [Managing Camunda 8 connectivity](#managing-camunda-8-connectivity)
1. [Logging](#logging)
1. [Multi-instance](#multi-instance)
1. [Message correlation IDs](#message-correlation-ids)
1. [Transaction behavior](#transaction-behavior)
Expand Down Expand Up @@ -136,6 +137,13 @@ vanillabp:
task-timeout: PT3S # overrides vanillabp.workflow-modules.Demo.workflows.DemoWorkflow.adapters.camunda8.task-timeout
```
### Logging
These attributes are set in addition to the [default logging context](https://github.com/vanillabp/spring-boot-support#logging)
(defined in class `io.vanillabp.camunda8.LoggingContext`):

* Tenant-ID - The tenant ID used accessing Camunda 8 cluster.

## Multi-instance

Since Camunda 8 is a remote engine the workflow is processed in a different runtime environment. Due to this fact the Blueprint adapter cannot do the entire binding of multi-instance context information under the hood. In the BPMN the multi-instance aspects like the input element, the element's index and the total number of elements have to be defined according to a certain naming convention:
Expand Down
2 changes: 1 addition & 1 deletion spring-boot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<dependency>
<groupId>io.vanillabp</groupId>
<artifactId>spring-boot-support</artifactId>
<version>1.1.1</version>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.camunda.spring</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,21 @@ public Camunda8TaskHandler camunda8TaskHandler(
final Object bean,
final Method method,
final List<MethodParameter> parameters,
final String idPropertyName) {
final String idPropertyName,
final String tenantId,
final String workflowModuleId,
final String bpmnProcessId) {

return new Camunda8TaskHandler(
taskType,
repository,
bean,
method,
parameters,
idPropertyName);
idPropertyName,
tenantId,
workflowModuleId,
bpmnProcessId);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.vanillabp.camunda8;

import org.slf4j.MDC;

public class LoggingContext extends io.vanillabp.springboot.adapter.LoggingContext {

/**
* The current workflow's tenant. Per default the same value as the workflow module's ID
* but may be overwritten by using Spring Boot properties.
*
* see <a href="https://github.com/camunda-community-hub/vanillabp-camunda8-adapter/tree/main/spring-boot#using-camunda-multi-tenancy">Multi-tenancy</a>
*/
public static final String WORKFLOW_TENANT_ID = "workflowTenantId";

public static void clearContext() {

io.vanillabp.springboot.adapter.LoggingContext.clearContext();

MDC.remove(WORKFLOW_ADAPTER_ID);
MDC.remove(WORKFLOW_AGGREGATE_ID);
MDC.remove(WORKFLOW_BPM_ID);
MDC.remove(WORKFLOW_BPMN_ID);
MDC.remove(WORKFLOW_TASK_NODE);
MDC.remove(WORKFLOW_TASK_ID);
MDC.remove(WORKFLOW_TASK_NODE_ID);
MDC.remove(WORKFLOW_MODULE_ID);
MDC.remove(WORKFLOW_TENANT_ID);

}

public static void setLoggingContext(
final String adapterId,
final String tenantId,
final String workflowModuleId,
final String aggregateId,
final String bpmnId,
final String taskId,
final String bpmId,
final String taskNode,
final String taskNodeId) {

MDC.put(WORKFLOW_ADAPTER_ID, adapterId);
MDC.put(WORKFLOW_AGGREGATE_ID, aggregateId);
MDC.put(WORKFLOW_BPM_ID, bpmId);
MDC.put(WORKFLOW_BPMN_ID, bpmnId);
MDC.put(WORKFLOW_TASK_NODE, taskNode);
MDC.put(WORKFLOW_TASK_ID, taskId);
MDC.put(WORKFLOW_TASK_NODE_ID, taskNodeId);
MDC.put(WORKFLOW_MODULE_ID, workflowModuleId);
MDC.put(WORKFLOW_TENANT_ID, tenantId);

final var context = io.vanillabp.springboot.adapter.LoggingContext.getWriteableContext();
context.put(WORKFLOW_TENANT_ID, tenantId);
context.put(WORKFLOW_ADAPTER_ID, adapterId);
context.put(WORKFLOW_AGGREGATE_ID, aggregateId);
context.put(WORKFLOW_BPM_ID, bpmId);
context.put(WORKFLOW_BPMN_ID, bpmnId);
context.put(WORKFLOW_TASK_NODE, taskNode);
context.put(WORKFLOW_TASK_ID, taskId);
context.put(WORKFLOW_TASK_NODE_ID, taskNodeId);
context.put(WORKFLOW_MODULE_ID, workflowModuleId);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.camunda.zeebe.client.ZeebeClient;
import io.vanillabp.camunda8.Camunda8AdapterConfiguration;
import io.vanillabp.camunda8.Camunda8VanillaBpProperties;
import io.vanillabp.camunda8.LoggingContext;
import io.vanillabp.springboot.adapter.AdapterAwareProcessService;
import io.vanillabp.springboot.adapter.ProcessServiceImplementation;
import java.time.Duration;
Expand Down Expand Up @@ -102,6 +103,11 @@ public CrudRepository<DE, Object> getWorkflowAggregateRepository() {

}

@Override
public String getPrimaryBpmnProcessId() {
return parent.getPrimaryBpmnProcessId();
}

@Override
public DE startWorkflow(
final DE workflowAggregate) throws Exception {
Expand Down Expand Up @@ -322,34 +328,53 @@ private DE runInTransaction(
final Consumer<DE> runnable,
final String methodSignature) {

// persist to get ID in case of @Id @GeneratedValue
// or force optimistic locking exceptions before running
// the workflow if aggregate was already persisted before
final var attachedAggregate = workflowAggregateRepository
.save(workflowAggregate);

if (TransactionSynchronizationManager.isActualTransactionActive()) {
if (taskIdToTestForAlreadyCompletedOrCancelled != null) {
try {

// persist to get ID in case of @Id @GeneratedValue
// or force optimistic locking exceptions before running
// the workflow if aggregate was already persisted before
final var attachedAggregate = workflowAggregateRepository
.save(workflowAggregate);

final var aggregateId = getWorkflowAggregateId.apply(attachedAggregate);
final var bpmnProcessId = parent.getPrimaryBpmnProcessId();
LoggingContext.setLoggingContext(
Camunda8AdapterConfiguration.ADAPTER_ID,
camunda8Properties.getTenantId(parent.getWorkflowModuleId()),
parent.getWorkflowModuleId(),
aggregateId == null ? null : aggregateId.toString(),
bpmnProcessId,
taskIdToTestForAlreadyCompletedOrCancelled,
null,
null,
null);

if (TransactionSynchronizationManager.isActualTransactionActive()) {
if (taskIdToTestForAlreadyCompletedOrCancelled != null) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
methodSignature,
() -> client
.newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16))
.timeout(Duration.ofMinutes(10))
.send()
.join(5, TimeUnit.MINUTES), // needs to run synchronously
() -> "aggregate: " + aggregateId + "; bpmn-process-id: " + bpmnProcessId));
}
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8TestForTaskAlreadyCompletedOrCancelled(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
() -> client
.newUpdateTimeoutCommand(Long.parseUnsignedLong(taskIdToTestForAlreadyCompletedOrCancelled, 16))
.timeout(Duration.ofMinutes(10))
.send()
.join(5, TimeUnit.MINUTES), // needs to run synchronously
() -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId()));
() -> runnable.accept(attachedAggregate),
() -> "aggregate: " + aggregateId + "; bpmn-process-id: " + bpmnProcessId));
} else {
runnable.accept(attachedAggregate);
}
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
() -> runnable.accept(attachedAggregate),
() -> "aggregate: " + getWorkflowAggregateId.apply(attachedAggregate) + "; bpmn-process-id: " + parent.getPrimaryBpmnProcessId()));
} else {
runnable.accept(attachedAggregate);
}

return attachedAggregate;
return attachedAggregate;

} finally {
LoggingContext.clearContext();
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.client.api.worker.JobHandler;
import io.vanillabp.camunda8.Camunda8AdapterConfiguration;
import io.vanillabp.camunda8.LoggingContext;
import io.vanillabp.camunda8.service.Camunda8TransactionAspect;
import io.vanillabp.camunda8.service.Camunda8TransactionProcessor;
import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type;
Expand Down Expand Up @@ -40,6 +42,12 @@ public class Camunda8TaskHandler extends TaskHandlerBase implements JobHandler,

private final String idPropertyName;

private final String tenantId;

private final String workflowModuleId;

private final String bpmnProcessId;

private ZeebeClient zeebeClient;

public Camunda8TaskHandler(
Expand All @@ -48,11 +56,17 @@ public Camunda8TaskHandler(
final Object bean,
final Method method,
final List<MethodParameter> parameters,
final String idPropertyName) {
final String idPropertyName,
final String tenantId,
final String workflowModuleId,
final String bpmnProcessId) {

super(workflowAggregateRepository, bean, method, parameters);
this.taskType = taskType;
this.idPropertyName = idPropertyName;
this.tenantId = tenantId;
this.workflowModuleId = workflowModuleId;
this.bpmnProcessId = bpmnProcessId;

}

Expand All @@ -79,6 +93,18 @@ public void handle(

try {
final var businessKey = getVariable(job, idPropertyName);
final var taskId = Long.toHexString(job.getKey());

LoggingContext.setLoggingContext(
Camunda8AdapterConfiguration.ADAPTER_ID,
tenantId,
workflowModuleId,
businessKey == null ? null : businessKey.toString(),
bpmnProcessId,
taskId,
Long.toString(job.getProcessInstanceKey()),
job.getBpmnProcessId() + "#" + job.getElementId(),
Long.toString(job.getElementInstanceKey()));

logger.trace("Will handle task '{}' (task-definition '{}‘) of workflow '{}' (instance-id '{}') as job '{}'",
job.getElementId(),
Expand Down Expand Up @@ -134,7 +160,7 @@ public void handle(
param,
() -> {
taskIdRetrieved.set(true);
return Long.toHexString(job.getKey());
return taskId;
}),
(args, param) -> processTaskEventParameter(
args,
Expand Down Expand Up @@ -167,6 +193,7 @@ public void handle(
} finally {
Camunda8TransactionProcessor.unregisterCallbacks();
Camunda8TransactionAspect.unregisterDeferredInTransaction();
LoggingContext.clearContext();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
import io.vanillabp.springboot.adapter.TaskWiringBase;
import io.vanillabp.springboot.parameters.MethodParameter;
import jakarta.persistence.Id;
import org.camunda.bpm.model.xml.instance.ModelElementInstance;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
Expand All @@ -37,6 +33,9 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.camunda.bpm.model.xml.instance.ModelElementInstance;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;

public class Camunda8TaskWiring extends TaskWiringBase<Camunda8Connectable, Camunda8ProcessService<?>, Camunda8MethodParameterFactory>
implements Consumer<ZeebeClient> {
Expand Down Expand Up @@ -241,7 +240,10 @@ protected void connectToBpms(
bean,
method,
parameters,
idPropertyName);
idPropertyName,
tenantId,
workflowModuleId,
processService.getPrimaryBpmnProcessId());
if (this.client != null) {
taskHandler.accept(this.client);
} else {
Expand Down

0 comments on commit eff63a6

Please sign in to comment.