Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow sweeper fix #49

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.netflix.conductor.service.ExecutionLockService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class OrkesWorkflowSweeper extends LifecycleAwareComponent {
private final MetricsCollector metricsCollector;
private final SystemTaskRegistry systemTaskRegistry;

private final ExecutionLockService executionLockService;

public OrkesWorkflowSweeper(
@Qualifier(SWEEPER_EXECUTOR_NAME) Executor sweeperExecutor,
QueueDAO queueDAO,
Expand All @@ -66,14 +69,16 @@ public OrkesWorkflowSweeper(
MetricsCollector metricsCollector,
SystemTaskRegistry systemTaskRegistry,
ConductorProperties properties,
OrkesSweeperProperties sweeperProperties) {
OrkesSweeperProperties sweeperProperties,
ExecutionLockService executionLockService) {
this.queueDAO = queueDAO;
this.executionDAO = executionDAO;
this.metricsCollector = metricsCollector;
this.systemTaskRegistry = systemTaskRegistry;
this.properties = properties;
this.sweeperProperties = sweeperProperties;
this.workflowExecutor = workflowExecutor;
this.executionLockService = executionLockService;
log.info("Initializing sweeper with {} threads", properties.getSweeperThreadCount());
for (int i = 0; i < properties.getSweeperThreadCount(); i++) {
sweeperExecutor.execute(this::pollAndSweep);
Expand Down Expand Up @@ -130,8 +135,12 @@ private boolean shouldTaskExistInQueue(TaskModel task) {
public void sweep(String workflowId) {
try {
log.info("Running sweeper for workflow {}", workflowId);
// 1. Run decide on the workflow
WorkflowModel workflow = decideAndRemove(workflowId);
//Run the below operations with a lock
if (!executionLockService.acquireLock(workflowId)) {
return;
}
WorkflowModel workflow = executionDAO.getWorkflow(workflowId);
workflow = decideAndRemove(workflow);
if (workflow == null || workflow.getStatus().isTerminal()) {
return;
}
Expand Down Expand Up @@ -161,20 +170,24 @@ public void sweep(String workflowId) {
"Workflow {} doesn't have an open pending task, requires force evaluation",
workflow.getWorkflowId());
forceSetLastTaskAsNotExecuted(workflow);
workflow = decideAndRemove(workflowId);
workflow = decideAndRemove(workflow);
if (workflow == null || workflow.getStatus().isTerminal()) {
log.warn("Removing from decider after repair is done {}", (workflow == null ? null: workflow.getStatus()));
queueDAO.remove(DECIDER_QUEUE, workflowId);
executionLockService.releaseLock(workflowId);
return;
}
log.debug(
"Force evaluation result for workflow {} - {}",
workflowId,
workflow.getStatus());
if (workflow == null || workflow.getStatus().isTerminal()) {
return;
}
}
// 3. If parent workflow exists, call repair on that too - meaning ensure the parent is
// in the decider queue
if (workflow.getParentWorkflowId() != null) {
ensureWorkflowExistsInDecider(workflow.getParentWorkflowId());
}
executionLockService.releaseLock(workflowId);
} catch (NotFoundException e) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
log.info("Workflow NOT found for id:{}. Removed it from decider queue", workflowId, e);
Expand All @@ -197,7 +210,7 @@ private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) {
taskModel.getTaskId(),
taskModel.getWorkflowInstanceId());
taskModel.setExecuted(false);
executionDAO.updateTask(taskModel);
executionDAO.updateWorkflow(workflow);
}
}

Expand All @@ -210,13 +223,13 @@ private List<TaskModel> getAllPendingTasks(WorkflowModel workflow) {
return Collections.emptyList();
}

private WorkflowModel decideAndRemove(String workflowId) {
WorkflowModel workflowModel = workflowExecutor.decide(workflowId);
private WorkflowModel decideAndRemove(WorkflowModel workflow) {
WorkflowModel workflowModel = workflowExecutor.decide(workflow);
if (workflowModel == null) {
return null;
}
if (workflowModel.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
queueDAO.remove(DECIDER_QUEUE, workflowModel.getWorkflowId());
}
return workflowModel;
}
Expand Down
Loading