Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PENDING type to healthchecks #360

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/routing-rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ condition: 'request.getHeader("X-Trino-Client-Tags") contains "label=foo"'

If no rules match, then request is routed to adhoc.

### TrinoStatus
ebyhr marked this conversation as resolved.
Show resolved Hide resolved

This class attempts to track the current state of Trino cluster. It is updated per every healthcheck.
There are three possible states

- PENDING
- A Trino cluster will show this state when it is still starting up. It will be treated as
unhealthy by RoutingManager, and therefore requests will not be routed to PENDING clusters
- HEALTHY
- A Trino cluster will show this state when healthchecks report clusters as healthy and ready.
RoutingManager will only route requests to healthy clusters
- UNHEALTHY
- A Trino cluster will show this state when healthchecks report clusters as unhealthy. RoutingManager
will not route requests to unhealthy clusters.

### TrinoRequestUser

This class attempts to extract the user from a request. In order, it attempts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public record ClusterStats(
int runningQueryCount,
int queuedQueryCount,
int numWorkerNodes,
boolean healthy,
TrinoStatus trinoStatus,
String proxyTo,
String externalUrl,
String routingGroup,
Expand All @@ -41,7 +41,7 @@ public static final class Builder
private int runningQueryCount;
private int queuedQueryCount;
private int numWorkerNodes;
private boolean healthy;
private TrinoStatus trinoStatus;
private String proxyTo;
private String externalUrl;
private String routingGroup;
Expand Down Expand Up @@ -70,9 +70,9 @@ public Builder numWorkerNodes(int numWorkerNodes)
return this;
}

public Builder healthy(boolean healthy)
public Builder trinoStatus(TrinoStatus trinoStatus)
{
this.healthy = healthy;
this.trinoStatus = trinoStatus;
return this;
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public ClusterStats build()
runningQueryCount,
queuedQueryCount,
numWorkerNodes,
healthy,
trinoStatus,
proxyTo,
externalUrl,
routingGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ClusterStats monitor(ProxyBackendConfiguration backend)
.numWorkerNodes(activeWorkers)
.queuedQueryCount((int) result.get("queuedQueries"))
.runningQueryCount((int) result.get("runningQueries"))
.healthy(activeWorkers > 0)
.trinoStatus(activeWorkers > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY)
.proxyTo(backend.getProxyTo())
.externalUrl(backend.getExternalUrl())
.routingGroup(backend.getRoutingGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,31 @@ public ClusterStatsInfoApiMonitor(HttpClient client, MonitorConfiguration monito
@Override
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
return ClusterStats.builder(backend.getName()).healthy(isReadyStatus(backend.getProxyTo()))
return ClusterStats.builder(backend.getName()).trinoStatus(checkStatus(backend.getProxyTo()))
.proxyTo(backend.getProxyTo())
.externalUrl(backend.getExternalUrl())
.routingGroup(backend.getRoutingGroup()).build();
}

private boolean isReadyStatus(String baseUrl)
private TrinoStatus checkStatus(String baseUrl)
{
return isReadyStatus(baseUrl, retries);
return checkStatus(baseUrl, retries);
}

private boolean isReadyStatus(String baseUrl, int retriesRemaining)
private TrinoStatus checkStatus(String baseUrl, int retriesRemaining)
{
Request request = prepareGet()
.setUri(uriBuilderFrom(URI.create(baseUrl)).appendPath("/v1/info").build())
.build();
try {
ServerInfo serverInfo = client.execute(request, SERVER_INFO_JSON_RESPONSE_HANDLER);
return !serverInfo.isStarting();
return serverInfo.isStarting() ? TrinoStatus.PENDING : TrinoStatus.HEALTHY;
}
catch (UnexpectedResponseException e) {
if (shouldRetry(e.getStatusCode())) {
if (retriesRemaining > 0) {
log.warn("Retrying health check on error: %s, ", e.toString());
return isReadyStatus(baseUrl, retriesRemaining - 1);
return checkStatus(baseUrl, retriesRemaining - 1);
}
else {
log.error("Encountered error %s, no retries remaining", e.toString());
Expand All @@ -86,7 +86,7 @@ private boolean isReadyStatus(String baseUrl, int retriesRemaining)
catch (Exception e) {
log.error(e, "Exception checking %s for health", request.getUri());
}
return false;
return TrinoStatus.UNHEALTHY;
}

public static boolean shouldRetry(int statusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public ClusterStats monitor(ProxyBackendConfiguration backend)
partialState.put(rs.getString("state"), rs.getInt("count"));
}
return clusterStats
.healthy(true)
// at this point we can set cluster to trinoStatus because otherwise
// it wouldn't have gotten worker stats
.trinoStatus(TrinoStatus.HEALTHY)
.queuedQueryCount(partialState.getOrDefault("QUEUED", 0))
.runningQueryCount(partialState.getOrDefault("RUNNING", 0))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public HealthChecker(Notifier notifier)
public void observe(List<ClusterStats> clustersStats)
{
for (ClusterStats clusterStats : clustersStats) {
if (!clusterStats.healthy()) {
if (clusterStats.trinoStatus() == TrinoStatus.UNHEALTHY) {
notifyUnhealthyCluster(clusterStats);
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

/**
* PENDING is for ui/observability purpose and functionally it's unhealthy
* We should use PENDING when Trino clusters are still spinning up
* HEALTHY is when health checks report clusters as up
* UNHEALTHY is when health checks report clusters as down
*/
public enum TrinoStatus
{
PENDING,
HEALTHY,
UNHEALTHY
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.GatewayBackendManager;
import io.trino.gateway.ha.router.ResourceGroupsManager;
import io.trino.gateway.ha.router.RoutingManager;
Expand Down Expand Up @@ -52,13 +55,19 @@ public class EntityEditorResource
private final GatewayBackendManager gatewayBackendManager;
private final ResourceGroupsManager resourceGroupsManager;
private final RoutingManager routingManager;
private final BackendStateManager backendStateManager;

@Inject
public EntityEditorResource(GatewayBackendManager gatewayBackendManager, ResourceGroupsManager resourceGroupsManager, RoutingManager routingManager)
public EntityEditorResource(
GatewayBackendManager gatewayBackendManager,
ResourceGroupsManager resourceGroupsManager,
RoutingManager routingManager,
BackendStateManager backendStateManager)
{
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
this.resourceGroupsManager = requireNonNull(resourceGroupsManager, "resourceGroupsManager is null");
this.routingManager = requireNonNull(routingManager, "routingManager is null");
this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null");
}

@GET
Expand Down Expand Up @@ -86,8 +95,16 @@ public Response updateEntity(
ProxyBackendConfiguration backend =
OBJECT_MAPPER.readValue(jsonPayload, ProxyBackendConfiguration.class);
gatewayBackendManager.updateBackend(backend);
log.info("Setting up the backend %s with healthy state", backend.getName());
routingManager.updateBackEndHealth(backend.getName(), backend.isActive());
log.info("Marking the cluster %s %s", backend.getName(), backend.isActive() ? "active" : "inactive");
// We mark Trino PENDING here so gateway won't immediately route traffic to this cluster yet
// until it is marked healthy by the healthcheck
TrinoStatus trinoStatus = backend.isActive() ? TrinoStatus.PENDING : TrinoStatus.UNHEALTHY;
andythsu marked this conversation as resolved.
Show resolved Hide resolved
routingManager.updateBackEndHealth(backend.getName(), trinoStatus);
backendStateManager.updateStates(
andythsu marked this conversation as resolved.
Show resolved Hide resolved
backend.getName(),
ClusterStats.builder(backend.getName())
.trinoStatus(trinoStatus)
.build());
break;
case RESOURCE_GROUP:
ResourceGroupsDetail resourceGroupDetails = OBJECT_MAPPER.readValue(jsonPayload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -45,7 +46,7 @@ static class LocalStats
{
private int runningQueryCount;
private int queuedQueryCount;
private boolean healthy;
private TrinoStatus trinoStatus;
private String proxyTo;
private String routingGroup;
private String clusterId;
Expand All @@ -56,7 +57,7 @@ static class LocalStats
clusterId = stats.clusterId();
runningQueryCount = stats.runningQueryCount();
queuedQueryCount = stats.queuedQueryCount();
healthy = stats.healthy();
trinoStatus = stats.trinoStatus();
proxyTo = stats.proxyTo();
routingGroup = stats.routingGroup();
if (stats.userQueuedCount() != null) {
Expand Down Expand Up @@ -92,14 +93,14 @@ public void queuedQueryCount(int queuedQueryCount)
this.queuedQueryCount = queuedQueryCount;
}

public boolean healthy()
public TrinoStatus trinoStatus()
{
return this.healthy;
return this.trinoStatus;
}

public void healthy(boolean healthy)
public void trinoStatus(TrinoStatus trinoStatus)
{
this.healthy = healthy;
this.trinoStatus = trinoStatus;
}

public String proxyTo()
Expand Down Expand Up @@ -186,7 +187,7 @@ private synchronized Optional<LocalStats> getClusterToRoute(String user, String
{
log.debug("sorting cluster stats for %s %s", user, routingGroup);
List<LocalStats> filteredList = clusterStats.stream()
.filter(stats -> stats.healthy())
.filter(stats -> stats.trinoStatus() == TrinoStatus.HEALTHY)
.filter(stats -> routingGroup.equals(stats.routingGroup()))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.cache.LoadingCache;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import jakarta.ws.rs.HttpMethod;

Expand Down Expand Up @@ -45,7 +46,7 @@ public abstract class RoutingManager
private final LoadingCache<String, String> queryIdBackendCache;
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final GatewayBackendManager gatewayBackendManager;
private final ConcurrentHashMap<String, Boolean> backendToHealth;
private final ConcurrentHashMap<String, TrinoStatus> backendToStatus;

public RoutingManager(GatewayBackendManager gatewayBackendManager)
{
Expand All @@ -64,7 +65,7 @@ public String load(String queryId)
}
});

this.backendToHealth = new ConcurrentHashMap<String, Boolean>();
this.backendToStatus = new ConcurrentHashMap<>();
}

protected GatewayBackendManager getGatewayBackendManager()
Expand Down Expand Up @@ -123,16 +124,16 @@ public String findBackendForQueryId(String queryId)
return backendAddress;
}

public void updateBackEndHealth(String backendId, Boolean value)
public void updateBackEndHealth(String backendId, TrinoStatus value)
{
log.info("backend %s isHealthy %s", backendId, value);
backendToHealth.put(backendId, value);
backendToStatus.put(backendId, value);
}

public void updateBackEndStats(List<ClusterStats> stats)
{
for (ClusterStats clusterStats : stats) {
updateBackEndHealth(clusterStats.clusterId(), clusterStats.healthy());
updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus());
}
}

Expand Down Expand Up @@ -183,14 +184,14 @@ protected String findBackendForUnknownQueryId(String queryId)
// We are returning the unhealthy (not healthy)
private boolean isBackendNotHealthy(String backendId)
{
if (backendToHealth.isEmpty()) {
if (backendToStatus.isEmpty()) {
log.error("backends can not be empty");
return true;
}
Boolean isHealthy = backendToHealth.get(backendId);
if (isHealthy == null) {
TrinoStatus status = backendToStatus.get(backendId);
if (status == null) {
return true;
}
return !isHealthy;
return status != TrinoStatus.HEALTHY;
}
}
Loading