Skip to content

Commit

Permalink
Fix: Add tenantId to the batch jobs if required.
Browse files Browse the repository at this point in the history
  • Loading branch information
amporsim committed Oct 16, 2024
1 parent 940bac5 commit b6701e6
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public BatchPartEntity createBatchPart(BatchEntity parentBatch, String status, S
batchPartEntity.setBatchSearchKey(parentBatch.getBatchSearchKey());
batchPartEntity.setBatchSearchKey2(parentBatch.getBatchSearchKey2());
batchPartEntity.setStatus(status);
if (parentBatch.getTenantId() != null) {
batchPartEntity.setTenantId(parentBatch.getTenantId());
}
batchPartEntity.setCreateTime(getClock().getCurrentTime());
insert(batchPartEntity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ public Batch batchMigrateCaseInstancesOfCaseDefinition(String caseDefinitionId,
.searchKey2(caseDefinition.getId())
.status(CaseInstanceBatchMigrationResult.STATUS_IN_PROGRESS)
.batchDocumentJson(document.asJsonString())
.tenantId(caseDefinition.getTenantId())
.create();

JobService jobService = engineConfiguration.getJobServiceConfiguration().getJobService();
Expand All @@ -487,6 +488,7 @@ public Batch batchMigrateCaseInstancesOfCaseDefinition(String caseDefinitionId,

JobEntity job = jobService.createJob();
job.setJobHandlerType(CaseInstanceMigrationJobHandler.TYPE);
job.setTenantId(caseInstance.getTenantId());
job.setScopeId(caseInstance.getId());
job.setScopeType(ScopeTypes.CMMN);
job.setJobHandlerConfiguration(CaseInstanceMigrationJobHandler.getHandlerCfgForBatchPartId(batchPart.getId()));
Expand All @@ -499,6 +501,7 @@ public Batch batchMigrateCaseInstancesOfCaseDefinition(String caseDefinitionId,
TimerJobService timerJobService = engineConfiguration.getJobServiceConfiguration().getTimerJobService();
TimerJobEntity timerJob = timerJobService.createTimerJob();
timerJob.setJobType(JobEntity.JOB_TYPE_TIMER);
timerJob.setTenantId(batch.getTenantId());
timerJob.setRevision(1);
timerJob.setRetries(0);
timerJob.setJobHandlerType(CaseInstanceMigrationStatusJobHandler.TYPE);
Expand Down Expand Up @@ -530,6 +533,7 @@ public Batch batchMigrateHistoricCaseInstancesOfCaseDefinition(String caseDefini
.searchKey2(caseDefinition.getId())
.status(CaseInstanceBatchMigrationResult.STATUS_IN_PROGRESS)
.batchDocumentJson(document.asJsonString())
.tenantId(caseDefinition.getTenantId())
.create();

JobService jobService = engineConfiguration.getJobServiceConfiguration().getJobService();
Expand All @@ -542,6 +546,7 @@ public Batch batchMigrateHistoricCaseInstancesOfCaseDefinition(String caseDefini
job.setScopeId(historicCaseInstance.getId());
job.setScopeType(ScopeTypes.CMMN);
job.setJobHandlerConfiguration(HistoricCaseInstanceMigrationJobHandler.getHandlerCfgForBatchPartId(batchPart.getId()));
job.setTenantId(historicCaseInstance.getTenantId());
jobService.createAsyncJob(job, false);
jobService.scheduleAsyncJob(job);
}
Expand All @@ -554,6 +559,7 @@ public Batch batchMigrateHistoricCaseInstancesOfCaseDefinition(String caseDefini
timerJob.setJobHandlerType(CaseInstanceMigrationStatusJobHandler.TYPE);
timerJob.setJobHandlerConfiguration(HistoricCaseInstanceMigrationJobHandler.getHandlerCfgForBatchId(batch.getId()));
timerJob.setScopeType(ScopeTypes.CMMN);
timerJob.setTenantId(batch.getTenantId());

BusinessCalendar businessCalendar = engineConfiguration.getBusinessCalendarManager().getBusinessCalendar(CycleBusinessCalendar.NAME);
timerJob.setDuedate(businessCalendar.resolveDuedate(engineConfiguration.getBatchStatusTimeCycleConfig()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,17 @@ protected CaseDefinition deployCaseDefinition(String name, String path) {
.singleResult();
}

protected CaseDefinition deployCaseDefinition(String name, String path, String tenantId) {
CmmnDeployment deployment = cmmnRepositoryService.createDeployment()
.name(name)
.addClasspathResource(path)
.tenantId(tenantId)
.deploy();

return cmmnRepositoryService.createCaseDefinitionQuery()
.deploymentId(deployment.getId())
.caseDefinitionTenantId(tenantId)
.singleResult();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,82 @@ void testCaseInstanceBatchMigrationSuccess() {
cmmnManagementService.deleteBatch(batch.getId());
}

@Test
void testCaseInstanceBatchMigrationWithTenant() {
// GIVEN
CaseDefinition sourceCaseDefinition = deployCaseDefinition("test1", "org/flowable/cmmn/test/migration/one-task.cmmn.xml", "acme");
CaseInstance caseInstance1 = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testCase").tenantId("acme").start();
CaseInstance caseInstance2 = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testCase").tenantId("acme").start();
CaseDefinition destinationDefinition = deployCaseDefinition("test1", "org/flowable/cmmn/test/migration/two-task.cmmn.xml", "acme");

CaseInstanceMigrationDocumentBuilder migrationDoc = new CaseInstanceMigrationDocumentBuilderImpl()
.setCaseDefinitionToMigrateTo(destinationDefinition.getId())
.addActivatePlanItemDefinitionMapping(PlanItemDefinitionMappingBuilder.createActivatePlanItemDefinitionMappingFor("humanTask2"));

Batch batch = cmmnMigrationService.createCaseInstanceMigrationBuilderFromCaseInstanceMigrationDocument(migrationDoc.build())
.batchMigrateCaseInstances(sourceCaseDefinition.getId());

assertThat(CmmnJobTestHelper.areJobsAvailable(cmmnManagementService)).isTrue();

CaseInstanceBatchMigrationResult migrationResultPriorProcessing = cmmnMigrationService.getResultsOfBatchCaseInstanceMigration(batch.getId());

// assert created migration result and parts
assertThat(migrationResultPriorProcessing).isNotNull();
assertThat(migrationResultPriorProcessing.getBatchId()).isEqualTo(batch.getId());
assertThat(migrationResultPriorProcessing.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_IN_PROGRESS);
assertThat(migrationResultPriorProcessing.getCompleteTime()).isNull();
assertThat(migrationResultPriorProcessing.getAllMigrationParts()).hasSize(2);
assertThat(migrationResultPriorProcessing.getWaitingMigrationParts()).hasSize(2);
assertThat(migrationResultPriorProcessing.getSuccessfulMigrationParts()).isEmpty();
assertThat(migrationResultPriorProcessing.getFailedMigrationParts()).isEmpty();

for (CaseInstanceBatchMigrationPartResult part : migrationResultPriorProcessing.getAllMigrationParts()) {
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_WAITING);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_WAITING);
}

// WHEN
// We are manually executing because on SQL Server on our CI there is a deadlock exception from SQL Server when multiple threads run
for (Job job : cmmnManagementService.createJobQuery().handlerType(CaseInstanceMigrationJobHandler.TYPE).jobTenantId("acme").list()) {
cmmnManagementService.executeJob(job.getId());
}
assertThat(CmmnJobTestHelper.areJobsAvailable(cmmnManagementService)).isFalse();
executeMigrationJobStatusHandlerTimerJob();

// THEN
CaseInstanceBatchMigrationResult migrationResult = cmmnMigrationService.getResultsOfBatchCaseInstanceMigration(batch.getId());
assertThat(migrationResult).isNotNull();
assertThat(migrationResult.getBatchId()).isEqualTo(batch.getId());
assertThat(migrationResult.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);

CaseInstance caseInstance1AfterMigration = cmmnRuntimeService.createCaseInstanceQuery()
.caseInstanceId(caseInstance1.getId())
.caseInstanceTenantId("acme")
.singleResult();
CaseInstance caseInstance2AfterMigration = cmmnRuntimeService.createCaseInstanceQuery()
.caseInstanceId(caseInstance2.getId())
.caseInstanceTenantId("acme")
.singleResult();

for (CaseInstanceBatchMigrationPartResult part : migrationResult.getAllMigrationParts()) {
assertThat(part.getStatus()).isEqualTo(CaseInstanceBatchMigrationResult.STATUS_COMPLETED);
assertThat(part.getResult()).isEqualTo(CaseInstanceBatchMigrationResult.RESULT_SUCCESS);
}

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance1.getId()).jobTenantId("acme").list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance1.getId()).jobTenantId("acme").list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance1.getId()).jobTenantId("acme").list()).hasSize(0);

assertThat(cmmnManagementService.createJobQuery().scopeId(caseInstance2.getId()).jobTenantId("acme").list()).hasSize(0);
assertThat(cmmnManagementService.createTimerJobQuery().scopeId(caseInstance2.getId()).jobTenantId("acme").list()).hasSize(0);
assertThat(cmmnManagementService.createDeadLetterJobQuery().scopeId(caseInstance2.getId()).jobTenantId("acme").list()).hasSize(0);

assertAfterMigrationState(caseInstance1, destinationDefinition, caseInstance1AfterMigration, 2);
assertAfterMigrationState(caseInstance2, destinationDefinition, caseInstance2AfterMigration, 2);

cmmnManagementService.deleteBatch(batch.getId());
}

@Test
void testCaseInstanceBatchMigrationWithError() {
// GIVEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.flowable.batch.api.Batch;
import org.flowable.batch.api.BatchBuilder;
import org.flowable.batch.api.BatchPart;
import org.flowable.batch.api.BatchService;
import org.flowable.bpmn.model.Activity;
Expand Down Expand Up @@ -332,13 +333,16 @@ public Batch batchMigrateProcessInstancesOfProcessDefinition(String sourceProcDe
new ProcessInstanceQueryImpl(commandContext, processEngineConfiguration).processDefinitionId(sourceProcDefId));

BatchService batchService = processEngineConfiguration.getBatchServiceConfiguration().getBatchService();
Batch batch = batchService.createBatchBuilder().batchType(Batch.PROCESS_MIGRATION_TYPE)
BatchBuilder batchBuilder = batchService.createBatchBuilder().batchType(Batch.PROCESS_MIGRATION_TYPE)
.searchKey(sourceProcDefId)
.searchKey2(targetProcessDefinition.getId())
.status(ProcessInstanceBatchMigrationResult.STATUS_IN_PROGRESS)
.batchDocumentJson(document.asJsonString())
.create();

.batchDocumentJson(document.asJsonString());
if (targetProcessDefinition.getTenantId() != null) {
batchBuilder.tenantId(targetProcessDefinition.getTenantId());
}
Batch batch = batchBuilder.create();

JobService jobService = processEngineConfiguration.getJobServiceConfiguration().getJobService();
for (ProcessInstance processInstance : processInstances) {
BatchPart batchPart = batchService.createBatchPart(batch, ProcessInstanceBatchMigrationResult.STATUS_WAITING,
Expand All @@ -348,6 +352,7 @@ public Batch batchMigrateProcessInstancesOfProcessDefinition(String sourceProcDe
job.setJobHandlerType(ProcessInstanceMigrationJobHandler.TYPE);
job.setProcessInstanceId(processInstance.getId());
job.setJobHandlerConfiguration(ProcessInstanceMigrationJobHandler.getHandlerCfgForBatchPartId(batchPart.getId()));
job.setTenantId(processInstance.getTenantId());
jobService.createAsyncJob(job, false);
job.setRetries(0);
jobService.scheduleAsyncJob(job);
Expand All @@ -361,6 +366,7 @@ public Batch batchMigrateProcessInstancesOfProcessDefinition(String sourceProcDe
timerJob.setRetries(0);
timerJob.setJobHandlerType(ProcessInstanceMigrationStatusJobHandler.TYPE);
timerJob.setJobHandlerConfiguration(ProcessInstanceMigrationJobHandler.getHandlerCfgForBatchId(batch.getId()));
timerJob.setTenantId(batch.getTenantId());

BusinessCalendar businessCalendar = processEngineConfiguration.getBusinessCalendarManager().getBusinessCalendar(CycleBusinessCalendar.NAME);
timerJob.setDuedate(businessCalendar.resolveDuedate(processEngineConfiguration.getBatchStatusTimeCycleConfig()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,21 @@ protected ProcessDefinition deployProcessDefinition(String name, String path) {
return processDefinition;
}

protected ProcessDefinition deployProcessDefinition(String name, String path, String tenantId) {
Deployment deployment = repositoryService.createDeployment()
.name(name)
.addClasspathResource(path)
.tenantId(tenantId)
.deploy();

ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
.deploymentId(deployment.getId())
.processDefinitionTenantId(tenantId)
.singleResult();

return processDefinition;
}

protected void completeProcessInstanceTasks(String processInstanceId) {
List<Task> tasks;
do {
Expand Down
Loading

0 comments on commit b6701e6

Please sign in to comment.