diff --git a/src/main/java/com/starrocks/connector/spark/rest/RestService.java b/src/main/java/com/starrocks/connector/spark/rest/RestService.java index 9933479..c616258 100644 --- a/src/main/java/com/starrocks/connector/spark/rest/RestService.java +++ b/src/main/java/com/starrocks/connector/spark/rest/RestService.java @@ -51,6 +51,8 @@ import java.io.IOException; import java.io.Serializable; +import java.net.InetAddress; +import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -354,6 +356,32 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws StarrocksEx return queryPlan; } + /** + * check if BE node is active + * @param candidate BE node, format: : + * @return BE node status, active is true + */ + @VisibleForTesting + static boolean isCandidateActive(String candidate) { + String[] split = candidate.split(":"); + String host = split[0]; + int port = Integer.parseInt(split[1]); + Socket socket = null; + try { + socket = new Socket(InetAddress.getByName(host), port); + return true; + } catch (Exception e) { + return false; + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + } + } + } + } + /** * select which StarRocks BE to get tablet data. * @@ -364,6 +392,7 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws StarrocksEx */ @VisibleForTesting static Map> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws StarrocksException { + Map candidatesStatus = new HashMap<>(); Map> be2Tablets = new HashMap<>(); for (Map.Entry part : queryPlan.getPartitions().entrySet()) { logger.debug("Parse tablet info: '{}'.", part); @@ -378,6 +407,14 @@ static Map> selectBeForTablet(QueryPlan queryPlan, Logger log String target = null; int tabletCount = Integer.MAX_VALUE; for (String candidate : part.getValue().getRoutings()) { + // check if BE node is active and save the status + if (!candidatesStatus.containsKey(candidate)) { + candidatesStatus.put(candidate, isCandidateActive(candidate)); + } + // check if BE node is active, if not, continue + if (Boolean.FALSE.equals(candidatesStatus.get(candidate))) { + continue; + } logger.trace("Evaluate StarRocks BE '{}' to tablet '{}'.", candidate, tabletId); if (!be2Tablets.containsKey(candidate)) { logger.debug("Choice a new StarRocks BE '{}' for tablet '{}'.", candidate, tabletId);