Skip to content

Commit

Permalink
[#2165] fix(client): Fix the exception caused by repeated unregister …
Browse files Browse the repository at this point in the history
…shuffle in tez tasks. (#2166)

### What changes were proposed in this pull request?

Do not unregister repeatedly.

### Why are the changes needed?

Fix: #2165 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

integration test
  • Loading branch information
zhengchenyu authored Oct 12, 2024
1 parent 43e8a7d commit d9ed675
Showing 1 changed file with 41 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1033,46 +1033,49 @@ public void unregisterShuffle(String appId) {
ExecutorService executorService = null;
try {
int concurrency = Math.min(unregisterThreadPoolSize, shuffleServerInfos.size());
executorService = ThreadUtils.getDaemonFixedThreadPool(concurrency, "unregister-shuffle");
if (concurrency > 0) {
executorService = ThreadUtils.getDaemonFixedThreadPool(concurrency, "unregister-shuffle");

ThreadUtils.executeTasks(
executorService,
shuffleServerInfos,
shuffleServerInfo -> {
try {
ShuffleServerClient client =
ShuffleServerClientFactory.getInstance()
.getShuffleServerClient(clientType, shuffleServerInfo, rssConf);
RssUnregisterShuffleByAppIdResponse response =
client.unregisterShuffleByAppId(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Successfully unregistered shuffle from {}", shuffleServerInfo);
} else {
LOG.warn("Failed to unregister shuffle from {}", shuffleServerInfo);
}
} catch (Exception e) {
// this request observed the unregisterRequestTimeSec timeout
if (e instanceof StatusRuntimeException
&& ((StatusRuntimeException) e).getStatus().getCode()
== Status.DEADLINE_EXCEEDED.getCode()) {
LOG.warn(
"Timeout occurred while unregistering from {}. The request timeout is {}s: {}",
shuffleServerInfo,
unregisterRequestTimeSec,
((StatusRuntimeException) e).getStatus().getDescription());
} else {
LOG.warn("Error while unregistering from {}", shuffleServerInfo, e);
ThreadUtils.executeTasks(
executorService,
shuffleServerInfos,
shuffleServerInfo -> {
try {
ShuffleServerClient client =
ShuffleServerClientFactory.getInstance()
.getShuffleServerClient(clientType, shuffleServerInfo, rssConf);
RssUnregisterShuffleByAppIdResponse response =
client.unregisterShuffleByAppId(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Successfully unregistered shuffle from {}", shuffleServerInfo);
} else {
LOG.warn("Failed to unregister shuffle from {}", shuffleServerInfo);
}
} catch (Exception e) {
// this request observed the unregisterRequestTimeSec timeout
if (e instanceof StatusRuntimeException
&& ((StatusRuntimeException) e).getStatus().getCode()
== Status.DEADLINE_EXCEEDED.getCode()) {
LOG.warn(
"Timeout occurred while unregistering from {}. The request timeout is {}s: {}",
shuffleServerInfo,
unregisterRequestTimeSec,
((StatusRuntimeException) e).getStatus().getDescription());
} else {
LOG.warn("Error while unregistering from {}", shuffleServerInfo, e);
}
}
}
return null;
},
unregisterTimeMs,
"unregister shuffle server",
String.format(
"Please consider increasing the thread pool size (%s) or the overall timeout (%ss) "
+ "if you still think the request timeout (%ss) is sensible.",
unregisterThreadPoolSize, unregisterTimeSec, unregisterRequestTimeSec));

return null;
},
unregisterTimeMs,
"unregister shuffle server",
String.format(
"Please consider increasing the thread pool size (%s) or the overall timeout (%ss) "
+ "if you still think the request timeout (%ss) is sensible.",
unregisterThreadPoolSize, unregisterTimeSec, unregisterRequestTimeSec));
} else {
LOG.info("No need to unregister shuffle.");
}
} finally {
if (executorService != null) {
executorService.shutdownNow();
Expand Down

0 comments on commit d9ed675

Please sign in to comment.