From b2215f004843e62eabbd8e19306326a6e22bd293 Mon Sep 17 00:00:00 2001 From: andythsu Date: Thu, 30 May 2024 12:03:18 -0400 Subject: [PATCH] Add PENDING type to healthchecks --- docs/routing-rules.md | 15 ++++++ .../ha/clustermonitor/ClusterStats.java | 10 ++-- .../ClusterStatsHttpMonitor.java | 2 +- .../ClusterStatsInfoApiMonitor.java | 14 ++--- .../ClusterStatsJdbcMonitor.java | 4 +- .../ha/clustermonitor/HealthChecker.java | 2 +- .../ha/clustermonitor/TrinoStatus.java | 27 ++++++++++ .../ha/resource/EntityEditorResource.java | 21 +++++++- .../ha/router/QueryCountBasedRouter.java | 15 +++--- .../gateway/ha/router/RoutingManager.java | 19 +++---- .../trino/gateway/ha/HaGatewayTestUtils.java | 52 ++++++++++++------- .../ha/TestGatewayHaMultipleBackend.java | 31 +++++++++-- .../TestClusterStatsMonitor.java | 2 +- .../ha/router/TestQueryCountBasedRouter.java | 17 +++--- .../router/TestStochasticRoutingManager.java | 7 +-- .../test/resources/test-config-template.yml | 11 ++++ .../test-config-with-routing-template.yml | 11 ++++ ...st-config-without-x-forwarded-template.yml | 11 ++++ 18 files changed, 201 insertions(+), 70 deletions(-) create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoStatus.java diff --git a/docs/routing-rules.md b/docs/routing-rules.md index 7c407ad72..5e8819bba 100644 --- a/docs/routing-rules.md +++ b/docs/routing-rules.md @@ -134,6 +134,21 @@ condition: 'request.getHeader("X-Trino-Client-Tags") contains "label=foo"' If no rules match, then request is routed to adhoc. +### TrinoStatus + +This class attempts to track the current state of Trino cluster. It is updated per every healthcheck. +There are three possible states + +- PENDING + - A Trino cluster will show this state when it is still starting up. It will be treated as + unhealthy by RoutingManager, and therefore requests will not be routed to PENDING clusters +- HEALTHY + - A Trino cluster will show this state when healthchecks report clusters as healthy and ready. + RoutingManager will only route requests to healthy clusters +- UNHEALTHY + - A Trino cluster will show this state when healthchecks report clusters as unhealthy. RoutingManager + will not route requests to unhealthy clusters. + ### TrinoRequestUser This class attempts to extract the user from a request. In order, it attempts diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java index 696cf1e63..da267ef18 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java @@ -24,7 +24,7 @@ public record ClusterStats( int runningQueryCount, int queuedQueryCount, int numWorkerNodes, - boolean healthy, + TrinoStatus trinoStatus, String proxyTo, String externalUrl, String routingGroup, @@ -41,7 +41,7 @@ public static final class Builder private int runningQueryCount; private int queuedQueryCount; private int numWorkerNodes; - private boolean healthy; + private TrinoStatus trinoStatus; private String proxyTo; private String externalUrl; private String routingGroup; @@ -70,9 +70,9 @@ public Builder numWorkerNodes(int numWorkerNodes) return this; } - public Builder healthy(boolean healthy) + public Builder trinoStatus(TrinoStatus trinoStatus) { - this.healthy = healthy; + this.trinoStatus = trinoStatus; return this; } @@ -107,7 +107,7 @@ public ClusterStats build() runningQueryCount, queuedQueryCount, numWorkerNodes, - healthy, + trinoStatus, proxyTo, externalUrl, routingGroup, diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java index 3ee9ed279..a948bc033 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java @@ -72,7 +72,7 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) .numWorkerNodes(activeWorkers) .queuedQueryCount((int) result.get("queuedQueries")) .runningQueryCount((int) result.get("runningQueries")) - .healthy(activeWorkers > 0) + .trinoStatus(activeWorkers > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY) .proxyTo(backend.getProxyTo()) .externalUrl(backend.getExternalUrl()) .routingGroup(backend.getRoutingGroup()); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java index 377244d47..7326c444f 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java @@ -49,31 +49,31 @@ public ClusterStatsInfoApiMonitor(HttpClient client, MonitorConfiguration monito @Override public ClusterStats monitor(ProxyBackendConfiguration backend) { - return ClusterStats.builder(backend.getName()).healthy(isReadyStatus(backend.getProxyTo())) + return ClusterStats.builder(backend.getName()).trinoStatus(checkStatus(backend.getProxyTo())) .proxyTo(backend.getProxyTo()) .externalUrl(backend.getExternalUrl()) .routingGroup(backend.getRoutingGroup()).build(); } - private boolean isReadyStatus(String baseUrl) + private TrinoStatus checkStatus(String baseUrl) { - return isReadyStatus(baseUrl, retries); + return checkStatus(baseUrl, retries); } - private boolean isReadyStatus(String baseUrl, int retriesRemaining) + private TrinoStatus checkStatus(String baseUrl, int retriesRemaining) { Request request = prepareGet() .setUri(uriBuilderFrom(URI.create(baseUrl)).appendPath("/v1/info").build()) .build(); try { ServerInfo serverInfo = client.execute(request, SERVER_INFO_JSON_RESPONSE_HANDLER); - return !serverInfo.isStarting(); + return serverInfo.isStarting() ? TrinoStatus.PENDING : TrinoStatus.HEALTHY; } catch (UnexpectedResponseException e) { if (shouldRetry(e.getStatusCode())) { if (retriesRemaining > 0) { log.warn("Retrying health check on error: %s, ", e.toString()); - return isReadyStatus(baseUrl, retriesRemaining - 1); + return checkStatus(baseUrl, retriesRemaining - 1); } else { log.error("Encountered error %s, no retries remaining", e.toString()); @@ -86,7 +86,7 @@ private boolean isReadyStatus(String baseUrl, int retriesRemaining) catch (Exception e) { log.error(e, "Exception checking %s for health", request.getUri()); } - return false; + return TrinoStatus.UNHEALTHY; } public static boolean shouldRetry(int statusCode) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java index 4e9f24989..af4bffcea 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java @@ -83,7 +83,9 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) partialState.put(rs.getString("state"), rs.getInt("count")); } return clusterStats - .healthy(true) + // at this point we can set cluster to trinoStatus because otherwise + // it wouldn't have gotten worker stats + .trinoStatus(TrinoStatus.HEALTHY) .queuedQueryCount(partialState.getOrDefault("QUEUED", 0)) .runningQueryCount(partialState.getOrDefault("RUNNING", 0)) .build(); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java index 58bb7608d..cfd0d19ad 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java @@ -34,7 +34,7 @@ public HealthChecker(Notifier notifier) public void observe(List clustersStats) { for (ClusterStats clusterStats : clustersStats) { - if (!clusterStats.healthy()) { + if (clusterStats.trinoStatus() == TrinoStatus.UNHEALTHY) { notifyUnhealthyCluster(clusterStats); } else { diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoStatus.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoStatus.java new file mode 100644 index 000000000..e33dcc0f6 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoStatus.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.clustermonitor; + +/** + * PENDING is for ui/observability purpose and functionally it's unhealthy + * We should use PENDING when Trino clusters are still spinning up + * HEALTHY is when health checks report clusters as up + * UNHEALTHY is when health checks report clusters as down + */ +public enum TrinoStatus +{ + PENDING, + HEALTHY, + UNHEALTHY +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java b/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java index 4002e6ea1..7fffaa26d 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java @@ -18,7 +18,10 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.airlift.log.Logger; +import io.trino.gateway.ha.clustermonitor.ClusterStats; +import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.router.BackendStateManager; import io.trino.gateway.ha.router.GatewayBackendManager; import io.trino.gateway.ha.router.ResourceGroupsManager; import io.trino.gateway.ha.router.RoutingManager; @@ -52,13 +55,19 @@ public class EntityEditorResource private final GatewayBackendManager gatewayBackendManager; private final ResourceGroupsManager resourceGroupsManager; private final RoutingManager routingManager; + private final BackendStateManager backendStateManager; @Inject - public EntityEditorResource(GatewayBackendManager gatewayBackendManager, ResourceGroupsManager resourceGroupsManager, RoutingManager routingManager) + public EntityEditorResource( + GatewayBackendManager gatewayBackendManager, + ResourceGroupsManager resourceGroupsManager, + RoutingManager routingManager, + BackendStateManager backendStateManager) { this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null"); this.resourceGroupsManager = requireNonNull(resourceGroupsManager, "resourceGroupsManager is null"); this.routingManager = requireNonNull(routingManager, "routingManager is null"); + this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null"); } @GET @@ -87,7 +96,15 @@ public Response updateEntity( OBJECT_MAPPER.readValue(jsonPayload, ProxyBackendConfiguration.class); gatewayBackendManager.updateBackend(backend); log.info("Marking the cluster %s %s", backend.getName(), backend.isActive() ? "active" : "inactive"); - routingManager.updateBackEndHealth(backend.getName(), backend.isActive()); + // We mark Trino PENDING here so gateway won't immediately route traffic to this cluster yet + // until it is marked healthy by the healthcheck + TrinoStatus trinoStatus = backend.isActive() ? TrinoStatus.PENDING : TrinoStatus.UNHEALTHY; + routingManager.updateBackEndHealth(backend.getName(), trinoStatus); + backendStateManager.updateStates( + backend.getName(), + ClusterStats.builder(backend.getName()) + .trinoStatus(trinoStatus) + .build()); break; case RESOURCE_GROUP: ResourceGroupsDetail resourceGroupDetails = OBJECT_MAPPER.readValue(jsonPayload, diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java index 620c2a0e4..05b49bfe4 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java @@ -19,6 +19,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import io.airlift.log.Logger; import io.trino.gateway.ha.clustermonitor.ClusterStats; +import io.trino.gateway.ha.clustermonitor.TrinoStatus; import java.util.ArrayList; import java.util.Collections; @@ -45,7 +46,7 @@ static class LocalStats { private int runningQueryCount; private int queuedQueryCount; - private boolean healthy; + private TrinoStatus trinoStatus; private String proxyTo; private String routingGroup; private String clusterId; @@ -56,7 +57,7 @@ static class LocalStats clusterId = stats.clusterId(); runningQueryCount = stats.runningQueryCount(); queuedQueryCount = stats.queuedQueryCount(); - healthy = stats.healthy(); + trinoStatus = stats.trinoStatus(); proxyTo = stats.proxyTo(); routingGroup = stats.routingGroup(); if (stats.userQueuedCount() != null) { @@ -92,14 +93,14 @@ public void queuedQueryCount(int queuedQueryCount) this.queuedQueryCount = queuedQueryCount; } - public boolean healthy() + public TrinoStatus trinoStatus() { - return this.healthy; + return this.trinoStatus; } - public void healthy(boolean healthy) + public void trinoStatus(TrinoStatus trinoStatus) { - this.healthy = healthy; + this.trinoStatus = trinoStatus; } public String proxyTo() @@ -186,7 +187,7 @@ private synchronized Optional getClusterToRoute(String user, String { log.debug("sorting cluster stats for %s %s", user, routingGroup); List filteredList = clusterStats.stream() - .filter(stats -> stats.healthy()) + .filter(stats -> stats.trinoStatus() == TrinoStatus.HEALTHY) .filter(stats -> routingGroup.equals(stats.routingGroup())) .collect(Collectors.toList()); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java index ff005a607..266917f8f 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java @@ -18,6 +18,7 @@ import com.google.common.cache.LoadingCache; import io.airlift.log.Logger; import io.trino.gateway.ha.clustermonitor.ClusterStats; +import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import jakarta.ws.rs.HttpMethod; @@ -45,7 +46,7 @@ public abstract class RoutingManager private final LoadingCache queryIdBackendCache; private final ExecutorService executorService = Executors.newFixedThreadPool(5); private final GatewayBackendManager gatewayBackendManager; - private final ConcurrentHashMap backendToHealth; + private final ConcurrentHashMap backendToStatus; public RoutingManager(GatewayBackendManager gatewayBackendManager) { @@ -64,7 +65,7 @@ public String load(String queryId) } }); - this.backendToHealth = new ConcurrentHashMap(); + this.backendToStatus = new ConcurrentHashMap<>(); } protected GatewayBackendManager getGatewayBackendManager() @@ -123,16 +124,16 @@ public String findBackendForQueryId(String queryId) return backendAddress; } - public void updateBackEndHealth(String backendId, Boolean value) + public void updateBackEndHealth(String backendId, TrinoStatus value) { log.info("backend %s isHealthy %s", backendId, value); - backendToHealth.put(backendId, value); + backendToStatus.put(backendId, value); } public void updateBackEndStats(List stats) { for (ClusterStats clusterStats : stats) { - updateBackEndHealth(clusterStats.clusterId(), clusterStats.healthy()); + updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus()); } } @@ -183,14 +184,14 @@ protected String findBackendForUnknownQueryId(String queryId) // We are returning the unhealthy (not healthy) private boolean isBackendNotHealthy(String backendId) { - if (backendToHealth.isEmpty()) { + if (backendToStatus.isEmpty()) { log.error("backends can not be empty"); return true; } - Boolean isHealthy = backendToHealth.get(backendId); - if (isHealthy == null) { + TrinoStatus status = backendToStatus.get(backendId); + if (status == null) { return true; } - return !isHealthy; + return status != TrinoStatus.HEALTHY; } } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java b/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java index ea390a68d..935fe989b 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java @@ -13,16 +13,18 @@ */ package io.trino.gateway.ha; +import io.airlift.json.JsonCodec; import io.airlift.log.Logger; +import io.trino.gateway.ha.clustermonitor.ClusterStats; +import io.trino.gateway.ha.clustermonitor.TrinoStatus; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; -import okhttp3.mockwebserver.Dispatcher; +import okhttp3.ResponseBody; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; @@ -32,12 +34,15 @@ import java.io.InputStream; import java.net.URL; import java.nio.file.Paths; -import java.util.Map; import java.util.Random; import java.util.Scanner; +import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.net.HttpHeaders.CONTENT_ENCODING; import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; @@ -71,23 +76,6 @@ public static void prepareMockBackend( .setResponseCode(200)); } - public static void setPathSpecificResponses( - MockWebServer backend, Map pathResponseMap) - { - Dispatcher dispatcher = new Dispatcher() - { - @Override - public MockResponse dispatch(RecordedRequest request) - { - if (pathResponseMap.containsKey(request.getPath())) { - return new MockResponse().setResponseCode(200).setBody(pathResponseMap.get(request.getPath())); - } - return new MockResponse().setResponseCode(404); - } - }; - backend.setDispatcher(dispatcher); - } - public static TestConfig buildGatewayConfigAndSeedDb(int routerPort, String configFile) throws Exception { @@ -159,6 +147,30 @@ public static void setUpBackend( .build(); Response response = httpClient.newCall(request).execute(); assertThat(response.isSuccessful()).isTrue(); + verifyTrinoStatus(routerPort, name); + } + + private static void verifyTrinoStatus(int port, String name) + throws IOException + { + Request getBackendStateRequest = new Request.Builder() + .url(format("http://localhost:%s/api/public/backends/%s/state", port, name)) + .get() + .build(); + + for (int i = 0; i < 10; i++) { + try (Response getBackendStateResponse = httpClient.newCall(getBackendStateRequest).execute()) { + checkState(getBackendStateResponse.isSuccessful()); + JsonCodec responseCodec = JsonCodec.jsonCodec(ClusterStats.class); + ResponseBody getBackendStateResponseBody = requireNonNull(getBackendStateResponse.body(), "getBackendStateResponse.body() is null"); + ClusterStats clusterStats = responseCodec.fromJson(getBackendStateResponseBody.string()); + if (clusterStats.trinoStatus() == TrinoStatus.HEALTHY) { + return; + } + } + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + throw new IllegalStateException("Trino cluster is not healthy"); } public record TestConfig(String configFilePath, String h2DbFilePath) diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java index af680a6a7..56ebfb5eb 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java @@ -26,7 +26,10 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -37,10 +40,13 @@ import java.io.IOException; import java.util.Base64; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import static com.google.common.collect.MoreCollectors.onlyElement; +import static com.google.common.net.HttpHeaders.CONTENT_TYPE; +import static com.google.common.net.MediaType.JSON_UTF_8; import static org.assertj.core.api.Assertions.assertThat; import static org.testcontainers.utility.MountableFile.forClasspathResource; @@ -84,11 +90,26 @@ void setup() int backend2Port = scheduledTrino.getMappedPort(8080); HaGatewayTestUtils.prepareMockBackend(customBackend, customBackendPort, "default custom response"); - HaGatewayTestUtils.setPathSpecificResponses(customBackend, ImmutableMap.of( - oauthInitiatePath, oauthInitialResponse, - oauthCallbackPath, oauthCallbackResponse, - CUSTOM_PATH, CUSTOM_RESPONSE, - CUSTOM_LOGOUT, "")); + customBackend.setDispatcher(new Dispatcher() { + @Override + public MockResponse dispatch(RecordedRequest request) + { + Map pathResponse = ImmutableMap.of( + oauthInitiatePath, oauthInitialResponse, + oauthCallbackPath, oauthCallbackResponse, + CUSTOM_PATH, CUSTOM_RESPONSE, + CUSTOM_LOGOUT, ""); + if (pathResponse.containsKey(request.getPath())) { + return new MockResponse().setResponseCode(200).setBody(pathResponse.get(request.getPath())); + } + if (request.getPath().equals("/v1/info")) { + return new MockResponse().setResponseCode(200) + .setHeader(CONTENT_TYPE, JSON_UTF_8) + .setBody("{\"starting\": false}"); + } + return new MockResponse().setResponseCode(404); + } + }); // seed database HaGatewayTestUtils.TestConfig testConfig = diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java index 3117d98fe..3a9ab15be 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java @@ -82,6 +82,6 @@ private void testClusterStatsMonitor(Function getClusterStatsList(String routingGroup) { ClusterStats.Builder cluster = ClusterStats.builder("c1"); cluster.proxyTo(BACKEND_URL_1); - cluster.healthy(true); + cluster.trinoStatus(TrinoStatus.HEALTHY); cluster.routingGroup(routingGroup); cluster.runningQueryCount(50); cluster.queuedQueryCount(SAME_QUERY_COUNT); @@ -63,7 +64,7 @@ private static List getClusterStatsList(String routingGroup) { ClusterStats.Builder cluster = ClusterStats.builder("c2"); cluster.proxyTo(BACKEND_URL_2); - cluster.healthy(true); + cluster.trinoStatus(TrinoStatus.HEALTHY); cluster.routingGroup(routingGroup); cluster.runningQueryCount(51); cluster.queuedQueryCount(SAME_QUERY_COUNT); @@ -78,7 +79,7 @@ private static List getClusterStatsList(String routingGroup) { ClusterStats.Builder cluster = ClusterStats.builder("c3"); cluster.proxyTo(BACKEND_URL_3); - cluster.healthy(true); + cluster.trinoStatus(TrinoStatus.HEALTHY); cluster.routingGroup(routingGroup); cluster.runningQueryCount(5); cluster.queuedQueryCount(SAME_QUERY_COUNT); @@ -93,7 +94,7 @@ private static List getClusterStatsList(String routingGroup) { ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy"); cluster.proxyTo("http://c-unhealthy"); - cluster.healthy(false); //This cluster should never show up to route + cluster.trinoStatus(TrinoStatus.UNHEALTHY); //This cluster should never show up to route cluster.routingGroup(routingGroup); cluster.runningQueryCount(5); cluster.queuedQueryCount(SAME_QUERY_COUNT); @@ -105,7 +106,7 @@ private static List getClusterStatsList(String routingGroup) { ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy2"); cluster.proxyTo("http://c-unhealthy2"); - cluster.healthy(false); //This cluster should never show up to route + cluster.trinoStatus(TrinoStatus.UNHEALTHY); //This cluster should never show up to route clustersBuilder.add(cluster.build()); } @@ -115,7 +116,7 @@ private static List getClusterStatsList(String routingGroup) cluster.proxyTo("http://c-messed-up"); //This is a scenrio when, something is really wrong //We just get the cluster state as health but no stats - cluster.healthy(true); + cluster.trinoStatus(TrinoStatus.HEALTHY); clustersBuilder.add(cluster.build()); } @@ -126,7 +127,7 @@ static ClusterStats getClusterWithNoUserQueueAndMinQueueCount() { ClusterStats.Builder cluster = ClusterStats.builder("c-Minimal-Queue"); cluster.proxyTo(BACKEND_URL_4); - cluster.healthy(true); + cluster.trinoStatus(TrinoStatus.HEALTHY); cluster.routingGroup("adhoc"); cluster.runningQueryCount(5); cluster.queuedQueryCount(LEAST_QUEUED_COUNT); @@ -137,7 +138,7 @@ static ClusterStats getClusterWithMinRunningQueries() { ClusterStats.Builder cluster = ClusterStats.builder("c-Minimal-Running"); cluster.proxyTo(BACKEND_URL_5); - cluster.healthy(true); + cluster.trinoStatus(TrinoStatus.HEALTHY); cluster.routingGroup("adhoc"); cluster.runningQueryCount(1); cluster.queuedQueryCount(LEAST_QUEUED_COUNT); diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java index d9afbeaa3..d891dbfe8 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java @@ -13,6 +13,7 @@ */ package io.trino.gateway.ha.router; +import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.BeforeAll; @@ -54,8 +55,8 @@ void testAddMockBackends() proxyBackend.setProxyTo(backend + ".trino.example.com"); proxyBackend.setExternalUrl("trino.example.com"); backendManager.addBackend(proxyBackend); - //set backend as healthy start with - haRoutingManager.updateBackEndHealth(backend, true); + //set backend as healthy to start with + haRoutingManager.updateBackEndHealth(backend, TrinoStatus.HEALTHY); } //Keep only 1st backend as healthy, mark all the others as unhealthy @@ -63,7 +64,7 @@ void testAddMockBackends() for (int i = 1; i < numBackends; i++) { backend = groupName + i; - haRoutingManager.updateBackEndHealth(backend, false); + haRoutingManager.updateBackEndHealth(backend, TrinoStatus.UNHEALTHY); } assertThat(haRoutingManager.provideBackendForRoutingGroup(groupName, "")) diff --git a/gateway-ha/src/test/resources/test-config-template.yml b/gateway-ha/src/test/resources/test-config-template.yml index beafe30c7..81ea1b461 100644 --- a/gateway-ha/src/test/resources/test-config-template.yml +++ b/gateway-ha/src/test/resources/test-config-template.yml @@ -11,6 +11,17 @@ dataStore: modules: - io.trino.gateway.ha.module.HaGatewayProviderModule + - io.trino.gateway.ha.module.ClusterStateListenerModule + - io.trino.gateway.ha.module.ClusterStatsMonitorModule + +managedApps: + - io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor + +clusterStatsConfiguration: + monitorType: INFO_API + +monitor: + taskDelaySeconds: 1 extraWhitelistPaths: - '/v1/custom.*' diff --git a/gateway-ha/src/test/resources/test-config-with-routing-template.yml b/gateway-ha/src/test/resources/test-config-with-routing-template.yml index 84a5b0f32..817b93e20 100644 --- a/gateway-ha/src/test/resources/test-config-with-routing-template.yml +++ b/gateway-ha/src/test/resources/test-config-with-routing-template.yml @@ -10,6 +10,17 @@ dataStore: modules: - io.trino.gateway.ha.module.HaGatewayProviderModule + - io.trino.gateway.ha.module.ClusterStateListenerModule + - io.trino.gateway.ha.module.ClusterStatsMonitorModule + +managedApps: + - io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor + +clusterStatsConfiguration: + monitorType: INFO_API + +monitor: + taskDelaySeconds: 1 extraWhitelistPaths: - '/v1/custom.*' diff --git a/gateway-ha/src/test/resources/test-config-without-x-forwarded-template.yml b/gateway-ha/src/test/resources/test-config-without-x-forwarded-template.yml index 15eb4b0cc..4be161476 100644 --- a/gateway-ha/src/test/resources/test-config-without-x-forwarded-template.yml +++ b/gateway-ha/src/test/resources/test-config-without-x-forwarded-template.yml @@ -10,6 +10,17 @@ dataStore: modules: - io.trino.gateway.ha.module.HaGatewayProviderModule + - io.trino.gateway.ha.module.ClusterStateListenerModule + - io.trino.gateway.ha.module.ClusterStatsMonitorModule + +managedApps: + - io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor + +clusterStatsConfiguration: + monitorType: INFO_API + +monitor: + taskDelaySeconds: 1 extraWhitelistPaths: - '/v1/custom.*'