Skip to content

Commit

Permalink
Merge pull request #3617 from malakaganga/fix_task_dup
Browse files Browse the repository at this point in the history
Fix task duplication in a volatile environment
  • Loading branch information
malakaganga authored Sep 20, 2024
2 parents 11bb8b0 + 35ff8b7 commit a6f83f2
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.wso2.micro.integrator.coordination.ClusterCoordinator;
import org.wso2.micro.integrator.coordination.MemberEventListener;
import org.wso2.micro.integrator.coordination.RDBMSMemberEventCallBack;
import org.wso2.micro.integrator.coordination.node.NodeDetail;
import org.wso2.micro.integrator.ntask.common.TaskException;
import org.wso2.micro.integrator.ntask.coordination.TaskCoordinationException;
Expand Down Expand Up @@ -79,13 +80,7 @@ && isMemberRejoinedAfterUnresponsiveness()) {
// This node became unresponsive and rejoined the cluster hence removing all tasks assigned to this node
// then start the scheduler again after cleaning the locally running tasks.
becameUnresponsive(nodeDetail.getNodeId());
try {
//Remove from database
taskStore.deleteTasks(nodeDetail.getNodeId());
} catch (TaskCoordinationException e) {
LOG.error("Error while removing the tasks of this node.", e);
}
reJoined(nodeDetail.getNodeId());
reJoined(nodeDetail.getNodeId(), null);
}
}

Expand Down Expand Up @@ -117,7 +112,8 @@ public void becameUnresponsive(String nodeId) {
ScheduledExecutorService taskScheduler = dataHolder.getTaskScheduler();
if (taskScheduler != null) {
LOG.info("Shutting down coordinated task scheduler scheduler since the node became unresponsive.");
taskScheduler.shutdown();
// Shutdown the task scheduler forcefully since node became unresponsive and need to avoid task duplication.
taskScheduler.shutdownNow();
dataHolder.setTaskScheduler(null);
}
List<String> tasks = taskManager.getLocallyRunningCoordinatedTasks();
Expand All @@ -129,6 +125,13 @@ public void becameUnresponsive(String nodeId) {
LOG.error("Unable to pause the task " + task, e);
}
});

try {
// Unassigns tasks from the specified node and updates their state in the task store.
taskStore.unAssignAndUpdateState(nodeId);
} catch (TaskCoordinationException e) {
LOG.error("Error while removing the tasks of this node.", e);
}
}

/**
Expand All @@ -142,22 +145,24 @@ public boolean isMemberRejoinedAfterUnresponsiveness() {


@Override
public void reJoined(String nodeId) {
public void reJoined(String nodeId, RDBMSMemberEventCallBack callBack) {

LOG.debug("This node re-joined the cluster successfully.");
try {
// removing the node id so that it will be resolved and assigned again in case if member removal
// hasn't happened already or the task hasn't been captured by task cleaning event.
// this will ensure that the task duplication doesn't occur.
taskStore.unAssignAndUpdateState(nodeId);
// start the scheduler again since the node joined cluster successfully.
CoordinatedTaskScheduleManager scheduleManager = new CoordinatedTaskScheduleManager(taskManager, taskStore,
clusterCoordinator, locationResolver);
scheduleManager.startTaskScheduler(" upon rejoining the cluster");
} catch (Throwable e) { // catching throwable so that we don't miss starting the scheduler
LOG.error("Error occurred while cleaning the tasks of node " + nodeId, e);
LOG.error("Error occurred while cleaning the tasks while rejoining of node " + nodeId, e);
if (callBack != null) {
callBack.onExceptionThrown(nodeId, e);
}
}
// start the scheduler again since the node joined cluster successfully.
CoordinatedTaskScheduleManager scheduleManager = new CoordinatedTaskScheduleManager(taskManager, taskStore,
clusterCoordinator,
locationResolver);
scheduleManager.startTaskScheduler(" upon rejoining the cluster");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void run() {
try {
pauseDeactivatedTasks();
scheduleAssignedTasks(CoordinatedTask.States.ACTIVATED);
checkInterrupted();
if (clusterCoordinator.isLeader()) {
// cleaning will run for each n times resolving frequency . ( n = 0,1,2 ... ).
if (resolveCount % resolvingFrequency == 0) {
Expand All @@ -101,11 +102,37 @@ public void run() {
}
// schedule all tasks assigned to this node and in state none
scheduleAssignedTasks(CoordinatedTask.States.NONE);
checkInterrupted();
} catch (Throwable throwable) { // catching throwable to prohibit permanent stopping of the executor service.
LOG.fatal("Unexpected error occurred while trying to schedule tasks.", throwable);
}
}

/**
* Check if the task is interrupted.
*
* @throws TaskCoordinationException when something goes wrong connecting to the store
*/
private void checkInterrupted() throws InterruptedException {

if (Thread.currentThread().isInterrupted()) {
try {
List<String> tasks = taskManager.getLocallyRunningCoordinatedTasks();
// stop all running coordinated tasks.
tasks.forEach(task -> {
try {
taskManager.stopExecution(task);
} catch (TaskException e) {
LOG.error("Unable to pause the task " + task, e);
}
});
} finally {
Thread.currentThread().interrupt();
throw new InterruptedException("Task was interrupted.");
}
}
}

/**
* Pause ( stop execution ) the deactivated tasks.
*
Expand Down Expand Up @@ -166,7 +193,8 @@ private void addFailedTasks() throws TaskCoordinationException {
* @param state - The state of the tasks which need to be scheduled.
* @throws TaskCoordinationException - When something goes wrong while retrieving all the assigned tasks.
*/
private void scheduleAssignedTasks(CoordinatedTask.States state) throws TaskCoordinationException {
private void scheduleAssignedTasks(CoordinatedTask.States state) throws TaskCoordinationException
, InterruptedException {

LOG.debug("Retrieving tasks assigned to this node and to be scheduled.");
List<String> tasksOfThisNode = taskStore.retrieveTaskNames(localNodeId, state);
Expand All @@ -184,6 +212,7 @@ private void scheduleAssignedTasks(CoordinatedTask.States state) throws TaskCoor
LOG.debug("Submitting retrieved task [" + taskName + "] to the task manager.");
}
try {
checkInterrupted();
taskManager.scheduleCoordinatedTask(taskName);
} catch (TaskException ex) {
if (!TaskException.Code.DATABASE_ERROR.equals(ex.getCode())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void becameUnresponsive(String nodeId) {
}

@Override
public void reJoined(String nodeId) {
public void reJoined(String nodeId, RDBMSMemberEventCallBack callBack) {
LOG.info("Re-joined the cluster successfully.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public abstract class MemberEventListener {
* Invoked when the node is back to active state after being inactive.
*
* @param nodeId - The Id of this node
* @param callBack - The callback to be called when an exception is thrown
*/
public abstract void reJoined(String nodeId);
public abstract void reJoined(String nodeId, RDBMSMemberEventCallBack callBack);

public String getGroupId() {
return this.groupId;
Expand Down
Loading

0 comments on commit a6f83f2

Please sign in to comment.