Skip to content

Commit

Permalink
Fix running tasks in non coordinator when former coordinator re-joini…
Browse files Browse the repository at this point in the history
…ng the cluster

When cluster coordinator become unresponsive (not shut down) the
task scheduler is still running even when it is added to the cluster
again as only a member.
So had to identify a rejoining member and delete already assigned tasks
then re-run a task scheduler.

Fixes: #3155
  • Loading branch information
malakaganga committed Mar 5, 2024
1 parent a8a2ddb commit 6cb5f52
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ public void memberAdded(NodeDetail nodeDetail) {
LOG.error("Exception occurred while resolving un assigned tasks upon member addition " + nodeDetail
.getNodeId(), e);
}
} else if (clusterCoordinator.getThisNodeId().equals(nodeDetail.getNodeId())
&& isMemberRejoinedAfterUnresponsiveness()) {
// This node became unresponsive and rejoined the cluster hence removing all tasks assigned to this node
// then start the scheduler again after cleaning the locally running tasks.
becameUnresponsive(nodeDetail.getNodeId());
try {
//Remove from database
taskStore.deleteTasks(nodeDetail.getNodeId());
} catch (TaskCoordinationException e) {
LOG.error("Error while removing the tasks of this node.", e);
}
reJoined(nodeDetail.getNodeId());
}
}

Expand Down Expand Up @@ -119,6 +131,16 @@ public void becameUnresponsive(String nodeId) {
});
}

/**
* Check whether the member has rejoined after being unresponsive.
*
* @return true if the member has rejoined after being unresponsive, false otherwise
*/
public boolean isMemberRejoinedAfterUnresponsiveness() {
return taskManager.getLocallyRunningCoordinatedTasks().size() > 0;
}


@Override
public void reJoined(String nodeId) {

Expand Down

0 comments on commit 6cb5f52

Please sign in to comment.