Skip to content

Commit

Permalink
Add PENDING type to healthchecks
Browse files Browse the repository at this point in the history
  • Loading branch information
andythsu authored and ebyhr committed Oct 11, 2024
1 parent 4f65808 commit b2215f0
Show file tree
Hide file tree
Showing 18 changed files with 201 additions and 70 deletions.
15 changes: 15 additions & 0 deletions docs/routing-rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ condition: 'request.getHeader("X-Trino-Client-Tags") contains "label=foo"'

If no rules match, then request is routed to adhoc.

### TrinoStatus

This class attempts to track the current state of Trino cluster. It is updated per every healthcheck.
There are three possible states

- PENDING
- A Trino cluster will show this state when it is still starting up. It will be treated as
unhealthy by RoutingManager, and therefore requests will not be routed to PENDING clusters
- HEALTHY
- A Trino cluster will show this state when healthchecks report clusters as healthy and ready.
RoutingManager will only route requests to healthy clusters
- UNHEALTHY
- A Trino cluster will show this state when healthchecks report clusters as unhealthy. RoutingManager
will not route requests to unhealthy clusters.

### TrinoRequestUser

This class attempts to extract the user from a request. In order, it attempts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public record ClusterStats(
int runningQueryCount,
int queuedQueryCount,
int numWorkerNodes,
boolean healthy,
TrinoStatus trinoStatus,
String proxyTo,
String externalUrl,
String routingGroup,
Expand All @@ -41,7 +41,7 @@ public static final class Builder
private int runningQueryCount;
private int queuedQueryCount;
private int numWorkerNodes;
private boolean healthy;
private TrinoStatus trinoStatus;
private String proxyTo;
private String externalUrl;
private String routingGroup;
Expand Down Expand Up @@ -70,9 +70,9 @@ public Builder numWorkerNodes(int numWorkerNodes)
return this;
}

public Builder healthy(boolean healthy)
public Builder trinoStatus(TrinoStatus trinoStatus)
{
this.healthy = healthy;
this.trinoStatus = trinoStatus;
return this;
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public ClusterStats build()
runningQueryCount,
queuedQueryCount,
numWorkerNodes,
healthy,
trinoStatus,
proxyTo,
externalUrl,
routingGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ClusterStats monitor(ProxyBackendConfiguration backend)
.numWorkerNodes(activeWorkers)
.queuedQueryCount((int) result.get("queuedQueries"))
.runningQueryCount((int) result.get("runningQueries"))
.healthy(activeWorkers > 0)
.trinoStatus(activeWorkers > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY)
.proxyTo(backend.getProxyTo())
.externalUrl(backend.getExternalUrl())
.routingGroup(backend.getRoutingGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,31 @@ public ClusterStatsInfoApiMonitor(HttpClient client, MonitorConfiguration monito
@Override
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
return ClusterStats.builder(backend.getName()).healthy(isReadyStatus(backend.getProxyTo()))
return ClusterStats.builder(backend.getName()).trinoStatus(checkStatus(backend.getProxyTo()))
.proxyTo(backend.getProxyTo())
.externalUrl(backend.getExternalUrl())
.routingGroup(backend.getRoutingGroup()).build();
}

private boolean isReadyStatus(String baseUrl)
private TrinoStatus checkStatus(String baseUrl)
{
return isReadyStatus(baseUrl, retries);
return checkStatus(baseUrl, retries);
}

private boolean isReadyStatus(String baseUrl, int retriesRemaining)
private TrinoStatus checkStatus(String baseUrl, int retriesRemaining)
{
Request request = prepareGet()
.setUri(uriBuilderFrom(URI.create(baseUrl)).appendPath("/v1/info").build())
.build();
try {
ServerInfo serverInfo = client.execute(request, SERVER_INFO_JSON_RESPONSE_HANDLER);
return !serverInfo.isStarting();
return serverInfo.isStarting() ? TrinoStatus.PENDING : TrinoStatus.HEALTHY;
}
catch (UnexpectedResponseException e) {
if (shouldRetry(e.getStatusCode())) {
if (retriesRemaining > 0) {
log.warn("Retrying health check on error: %s, ", e.toString());
return isReadyStatus(baseUrl, retriesRemaining - 1);
return checkStatus(baseUrl, retriesRemaining - 1);
}
else {
log.error("Encountered error %s, no retries remaining", e.toString());
Expand All @@ -86,7 +86,7 @@ private boolean isReadyStatus(String baseUrl, int retriesRemaining)
catch (Exception e) {
log.error(e, "Exception checking %s for health", request.getUri());
}
return false;
return TrinoStatus.UNHEALTHY;
}

public static boolean shouldRetry(int statusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public ClusterStats monitor(ProxyBackendConfiguration backend)
partialState.put(rs.getString("state"), rs.getInt("count"));
}
return clusterStats
.healthy(true)
// at this point we can set cluster to trinoStatus because otherwise
// it wouldn't have gotten worker stats
.trinoStatus(TrinoStatus.HEALTHY)
.queuedQueryCount(partialState.getOrDefault("QUEUED", 0))
.runningQueryCount(partialState.getOrDefault("RUNNING", 0))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public HealthChecker(Notifier notifier)
public void observe(List<ClusterStats> clustersStats)
{
for (ClusterStats clusterStats : clustersStats) {
if (!clusterStats.healthy()) {
if (clusterStats.trinoStatus() == TrinoStatus.UNHEALTHY) {
notifyUnhealthyCluster(clusterStats);
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

/**
* PENDING is for ui/observability purpose and functionally it's unhealthy
* We should use PENDING when Trino clusters are still spinning up
* HEALTHY is when health checks report clusters as up
* UNHEALTHY is when health checks report clusters as down
*/
public enum TrinoStatus
{
PENDING,
HEALTHY,
UNHEALTHY
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.GatewayBackendManager;
import io.trino.gateway.ha.router.ResourceGroupsManager;
import io.trino.gateway.ha.router.RoutingManager;
Expand Down Expand Up @@ -52,13 +55,19 @@ public class EntityEditorResource
private final GatewayBackendManager gatewayBackendManager;
private final ResourceGroupsManager resourceGroupsManager;
private final RoutingManager routingManager;
private final BackendStateManager backendStateManager;

@Inject
public EntityEditorResource(GatewayBackendManager gatewayBackendManager, ResourceGroupsManager resourceGroupsManager, RoutingManager routingManager)
public EntityEditorResource(
GatewayBackendManager gatewayBackendManager,
ResourceGroupsManager resourceGroupsManager,
RoutingManager routingManager,
BackendStateManager backendStateManager)
{
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
this.resourceGroupsManager = requireNonNull(resourceGroupsManager, "resourceGroupsManager is null");
this.routingManager = requireNonNull(routingManager, "routingManager is null");
this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null");
}

@GET
Expand Down Expand Up @@ -87,7 +96,15 @@ public Response updateEntity(
OBJECT_MAPPER.readValue(jsonPayload, ProxyBackendConfiguration.class);
gatewayBackendManager.updateBackend(backend);
log.info("Marking the cluster %s %s", backend.getName(), backend.isActive() ? "active" : "inactive");
routingManager.updateBackEndHealth(backend.getName(), backend.isActive());
// We mark Trino PENDING here so gateway won't immediately route traffic to this cluster yet
// until it is marked healthy by the healthcheck
TrinoStatus trinoStatus = backend.isActive() ? TrinoStatus.PENDING : TrinoStatus.UNHEALTHY;
routingManager.updateBackEndHealth(backend.getName(), trinoStatus);
backendStateManager.updateStates(
backend.getName(),
ClusterStats.builder(backend.getName())
.trinoStatus(trinoStatus)
.build());
break;
case RESOURCE_GROUP:
ResourceGroupsDetail resourceGroupDetails = OBJECT_MAPPER.readValue(jsonPayload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -45,7 +46,7 @@ static class LocalStats
{
private int runningQueryCount;
private int queuedQueryCount;
private boolean healthy;
private TrinoStatus trinoStatus;
private String proxyTo;
private String routingGroup;
private String clusterId;
Expand All @@ -56,7 +57,7 @@ static class LocalStats
clusterId = stats.clusterId();
runningQueryCount = stats.runningQueryCount();
queuedQueryCount = stats.queuedQueryCount();
healthy = stats.healthy();
trinoStatus = stats.trinoStatus();
proxyTo = stats.proxyTo();
routingGroup = stats.routingGroup();
if (stats.userQueuedCount() != null) {
Expand Down Expand Up @@ -92,14 +93,14 @@ public void queuedQueryCount(int queuedQueryCount)
this.queuedQueryCount = queuedQueryCount;
}

public boolean healthy()
public TrinoStatus trinoStatus()
{
return this.healthy;
return this.trinoStatus;
}

public void healthy(boolean healthy)
public void trinoStatus(TrinoStatus trinoStatus)
{
this.healthy = healthy;
this.trinoStatus = trinoStatus;
}

public String proxyTo()
Expand Down Expand Up @@ -186,7 +187,7 @@ private synchronized Optional<LocalStats> getClusterToRoute(String user, String
{
log.debug("sorting cluster stats for %s %s", user, routingGroup);
List<LocalStats> filteredList = clusterStats.stream()
.filter(stats -> stats.healthy())
.filter(stats -> stats.trinoStatus() == TrinoStatus.HEALTHY)
.filter(stats -> routingGroup.equals(stats.routingGroup()))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.cache.LoadingCache;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import jakarta.ws.rs.HttpMethod;

Expand Down Expand Up @@ -45,7 +46,7 @@ public abstract class RoutingManager
private final LoadingCache<String, String> queryIdBackendCache;
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final GatewayBackendManager gatewayBackendManager;
private final ConcurrentHashMap<String, Boolean> backendToHealth;
private final ConcurrentHashMap<String, TrinoStatus> backendToStatus;

public RoutingManager(GatewayBackendManager gatewayBackendManager)
{
Expand All @@ -64,7 +65,7 @@ public String load(String queryId)
}
});

this.backendToHealth = new ConcurrentHashMap<String, Boolean>();
this.backendToStatus = new ConcurrentHashMap<>();
}

protected GatewayBackendManager getGatewayBackendManager()
Expand Down Expand Up @@ -123,16 +124,16 @@ public String findBackendForQueryId(String queryId)
return backendAddress;
}

public void updateBackEndHealth(String backendId, Boolean value)
public void updateBackEndHealth(String backendId, TrinoStatus value)
{
log.info("backend %s isHealthy %s", backendId, value);
backendToHealth.put(backendId, value);
backendToStatus.put(backendId, value);
}

public void updateBackEndStats(List<ClusterStats> stats)
{
for (ClusterStats clusterStats : stats) {
updateBackEndHealth(clusterStats.clusterId(), clusterStats.healthy());
updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus());
}
}

Expand Down Expand Up @@ -183,14 +184,14 @@ protected String findBackendForUnknownQueryId(String queryId)
// We are returning the unhealthy (not healthy)
private boolean isBackendNotHealthy(String backendId)
{
if (backendToHealth.isEmpty()) {
if (backendToStatus.isEmpty()) {
log.error("backends can not be empty");
return true;
}
Boolean isHealthy = backendToHealth.get(backendId);
if (isHealthy == null) {
TrinoStatus status = backendToStatus.get(backendId);
if (status == null) {
return true;
}
return !isHealthy;
return status != TrinoStatus.HEALTHY;
}
}
Loading

0 comments on commit b2215f0

Please sign in to comment.