Skip to content

Commit

Permalink
fix: reuseEngine may throw NullPointerException exception #4662 (#4663)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlieYan24 authored Jun 19, 2023
1 parent 7a0ac08 commit 1149374
Showing 1 changed file with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.rpc.message.annotation.Receiver;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.MutablePair;

Expand Down Expand Up @@ -162,7 +164,7 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send
instances.keySet().toArray(new ScoreServiceInstance[0]);
EngineNode[] engineScoreList = getEngineNodeManager().getEngineNodes(scoreServiceInstances);

EngineNode engine = null;
List<EngineNode> engines = Lists.newArrayList();
int count = 1;
long timeout =
engineReuseRequest.getTimeOut() <= 0
Expand All @@ -176,7 +178,7 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send
long startTime = System.currentTimeMillis();
try {
LinkisUtils.waitUntil(
() -> selectEngineToReuse(MutablePair.of(count, reuseLimit), engine, engineScoreList),
() -> selectEngineToReuse(MutablePair.of(count, reuseLimit), engines, engineScoreList),
Duration.ofMillis(timeout));
} catch (TimeoutException e) {
throw new LinkisRetryException(
Expand All @@ -189,6 +191,7 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send
AMConstant.ENGINE_ERROR_CODE,
"Failed to reuse engineConn time taken " + (System.currentTimeMillis() - startTime));
}
EngineNode engine = engines.get(0);
logger.info(
"Finished to reuse Engine for request: "
+ engineReuseRequest
Expand All @@ -201,7 +204,7 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send
.stream()
.filter(kv -> kv.getKey().getServiceInstance().equals(engine.getServiceInstance()))
.collect(Collectors.toList());
if (engineServiceLabelList != null && !engineServiceLabelList.isEmpty()) {
if (!engineServiceLabelList.isEmpty()) {
engine.setLabels(engineServiceLabelList.get(0).getValue());
} else {
logger.info(
Expand All @@ -216,7 +219,7 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send

public boolean selectEngineToReuse(
MutablePair<Integer, Integer> count2reuseLimit,
EngineNode engine,
List<EngineNode> engines,
EngineNode[] engineScoreList) {
if (count2reuseLimit.getLeft() > count2reuseLimit.getRight()) {
throw new LinkisRetryException(
Expand All @@ -229,7 +232,8 @@ public boolean selectEngineToReuse(
}
EngineNode engineNode = (EngineNode) choseNode.get();
logger.info("prepare to reuse engineNode: " + engineNode.getServiceInstance());
engine =

EngineNode reuseEngine =
LinkisUtils.tryCatch(
() -> getEngineNodeManager().reuseEngine(engineNode),
(Throwable t) -> {
Expand All @@ -244,14 +248,18 @@ public boolean selectEngineToReuse(
}
return null;
});
if (engine == null) {
if (Objects.nonNull(reuseEngine)) {
engines.add(reuseEngine);
}

if (CollectionUtils.isEmpty(engines)) {
Integer count = count2reuseLimit.getKey() + 1;
count2reuseLimit.setLeft(count);
engineScoreList =
Arrays.stream(engineScoreList)
.filter(node -> !node.equals(choseNode.get()))
.toArray(EngineNode[]::new);
}
return engine != null;
return CollectionUtils.isNotEmpty(engines);
}
}

0 comments on commit 1149374

Please sign in to comment.