Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix model delete failed after model undeployed #2613

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
import static org.opensearch.ml.common.CommonValue.UNDEPLOYED;
import static org.opensearch.ml.plugin.MachineLearningPlugin.DEPLOY_THREAD_POOL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that multiple Undeploy actions blocking the Deploy_Thread_Pool will cause "Deploy" to fail?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of cases, the update index completes immediately which means there's no actual blocking here. In edge cases, if the update completes relatively slow, the requests will be queued. In a more edge case, the queue is full, then the deploy model will fail. But this usually mean the system is not in a correct status(updating index taking too long time). Also this failure is recoverable which doesn't seem a critical issue.


import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.opensearch.action.FailedNodeException;
Expand All @@ -29,6 +32,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
Expand All @@ -44,7 +48,6 @@
import org.opensearch.ml.model.MLModelManager;
import org.opensearch.ml.stats.MLNodeLevelStat;
import org.opensearch.ml.stats.MLStats;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -80,7 +83,7 @@ public TransportUndeployModelAction(
actionFilters,
MLUndeployModelNodesRequest::new,
MLUndeployModelNodeRequest::new,
ThreadPool.Names.MANAGEMENT,
DEPLOY_THREAD_POOL,
MLUndeployModelNodeResponse.class
);
this.mlModelManager = mlModelManager;
Expand All @@ -92,31 +95,22 @@ public TransportUndeployModelAction(
}

@Override
protected void doExecute(Task task, MLUndeployModelNodesRequest request, ActionListener<MLUndeployModelNodesResponse> listener) {
ActionListener<MLUndeployModelNodesResponse> wrappedListener = ActionListener.wrap(undeployModelNodesResponse -> {
processUndeployModelResponseAndUpdate(undeployModelNodesResponse, listener);
}, listener::onFailure);
super.doExecute(task, request, wrappedListener);
}

void processUndeployModelResponseAndUpdate(
MLUndeployModelNodesResponse undeployModelNodesResponse,
ActionListener<MLUndeployModelNodesResponse> listener
protected MLUndeployModelNodesResponse newResponse(
MLUndeployModelNodesRequest nodesRequest,
List<MLUndeployModelNodeResponse> responses,
List<FailedNodeException> failures
) {
List<MLUndeployModelNodeResponse> responses = undeployModelNodesResponse.getNodes();
if (responses == null || responses.isEmpty()) {
listener.onResponse(undeployModelNodesResponse);
return;
if (CollectionUtils.isEmpty(responses)) {
return new MLUndeployModelNodesResponse(clusterService.getClusterName(), responses, failures);
}

Map<String, List<String>> actualRemovedNodesMap = new HashMap<>();
Map<String, String[]> modelWorkNodesBeforeRemoval = new HashMap<>();
responses.forEach(r -> {
Map<String, String[]> nodeCounts = r.getModelWorkerNodeBeforeRemoval();

if (nodeCounts != null) {
for (Map.Entry<String, String[]> entry : nodeCounts.entrySet()) {
// when undeploy an undeployed model, the entry.getvalue() is null
// when undeploy a undeployed model, the entry.getvalue() is null
if (entry.getValue() != null
&& (!modelWorkNodesBeforeRemoval.containsKey(entry.getKey())
|| modelWorkNodesBeforeRemoval.get(entry.getKey()).length < entry.getValue().length)) {
Expand Down Expand Up @@ -144,8 +138,9 @@ void processUndeployModelResponseAndUpdate(
.build();

MLSyncUpNodesRequest syncUpRequest = new MLSyncUpNodesRequest(nodeFilter.getAllNodes(), syncUpInput);
CountDownLatch countDownLatch = new CountDownLatch(1);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
if (actualRemovedNodesMap.size() > 0) {
if (!actualRemovedNodesMap.isEmpty()) {
BulkRequest bulkRequest = new BulkRequest();
Map<String, Boolean> deployToAllNodes = new HashMap<>();
for (String modelId : actualRemovedNodesMap.keySet()) {
Expand Down Expand Up @@ -188,24 +183,35 @@ void processUndeployModelResponseAndUpdate(
"updated model state as undeployed for : {}",
Arrays.toString(actualRemovedNodesMap.keySet().toArray(new String[0]))
);
}, e -> { log.error("Failed to update model state as undeployed", e); });
countDownLatch.countDown();
}, e -> {
log.error("Failed to update model state as undeployed", e);
countDownLatch.countDown();
});
client.bulk(bulkRequest, ActionListener.runAfter(actionListener, () -> {
syncUpUndeployedModels(syncUpRequest);
listener.onResponse(undeployModelNodesResponse);
context.restore();
}));
} else {
syncUpUndeployedModels(syncUpRequest);
listener.onResponse(undeployModelNodesResponse);
context.restore();
countDownLatch.countDown();
}
}
if (countDownLatch.getCount() != 0) {
try {
boolean success = countDownLatch.await(1000, TimeUnit.MILLISECONDS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 1 second too large? In most cases the update index happen pretty fast.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a at most value, if the update index happens within 1s, the await will break immediately after it's done. If update takes more than 1s, then await waits at most 1s to break.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block current thread, which not recommended. We saw issues several times for such blocking thread code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're not blocking event loop thread, it's safe.

if (!success) {
log
.error(
"Failed to update model state as undeployed in model index after waiting for 1 second, please check model status manually"
);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Failed to update model state as undeployed in model index, current thread is interrupted", e);
}
}
}

@Override
protected MLUndeployModelNodesResponse newResponse(
MLUndeployModelNodesRequest nodesRequest,
List<MLUndeployModelNodeResponse> responses,
List<FailedNodeException> failures
) {
return new MLUndeployModelNodesResponse(clusterService.getClusterName(), responses, failures);
}

Expand Down
Loading
Loading