Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced 'Pending' state for backend's health #91

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-testing</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public HaGatewayLauncher(String... basePackages) {
super(basePackages);
}

public HaGatewayLauncher(){}

@Override
public void initialize(Bootstrap<HaGatewayConfiguration> bootstrap) {
super.initialize(bootstrap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.trino.gateway.ha.clustermonitor;

public enum BackendHealthState {
HEALTHY,
PENDING,
UNHEALTHY
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ClusterStats {
private int queuedQueryCount;
private int blockedQueryCount;
private int numWorkerNodes;
private boolean healthy;
private BackendHealthState healthy = BackendHealthState.PENDING;
private String clusterId;
private String proxyTo;
private String externalUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) {
HashMap<String, Object> result = null;
result = new ObjectMapper().readValue(response, HashMap.class);

clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setHealthy(clusterStats.getNumWorkerNodes() > 0);
clusterStats.setNumWorkerNodes((int) result.getOrDefault("activeWorkers", 0));
clusterStats.setQueuedQueryCount((int) result.getOrDefault("queuedQueries", 0));
clusterStats.setRunningQueryCount((int) result.getOrDefault("runningQueries", 0));
clusterStats.setBlockedQueryCount((int) result.getOrDefault("blockedQueries", 0));
clusterStats.setHealthy(clusterStats.getNumWorkerNodes() > 0 ? BackendHealthState.HEALTHY : BackendHealthState.UNHEALTHY);
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setExternalUrl(backend.getExternalUrl());
clusterStats.setRoutingGroup(backend.getRoutingGroup());

} catch (Exception e) {
log.error("Error parsing cluster stats from [{}]", response, e);
log.error("Error parsing cluster stats from {}", response, e);
}

// Fetch User Level Stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) {
while (rs.next()) {
partialState.put(rs.getString("state"), rs.getInt("count"));
}
clusterStats.setHealthy(true);
clusterStats.setHealthy(BackendHealthState.HEALTHY);
clusterStats.setQueuedQueryCount(partialState.getOrDefault("QUEUED", 0));
clusterStats.setRunningQueryCount(partialState.getOrDefault("RUNNING", 0));
return clusterStats;
} catch (TimeoutException e) {
log.error("timed out fetching status for {} backend, {}", url, e);
log.error("timed out fetching status for {} backend, {}", jdbcUrl, e);
} catch (Exception e) {
log.error("could not fetch status for {} backend, {}", url, e);
log.error("could not fetch status for {} backend, {}", jdbcUrl, e);
}
return clusterStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ public HealthCheckObserver(RoutingManager routingManager) {
@Override
public void observe(java.util.List<ClusterStats> clustersStats) {
for (ClusterStats clusterStats : clustersStats) {
routingManager.upateBackEndHealth(clusterStats.getClusterId(), clusterStats.isHealthy());
routingManager.updateBackEndHealthDB(clusterStats);
routingManager.upateBackEndHealth(clusterStats.getClusterId(), clusterStats.getHealthy());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public HealthChecker(Notifier notifier) {
@Override
public void observe(List<ClusterStats> clustersStats) {
for (ClusterStats clusterStats : clustersStats) {
if (!clusterStats.isHealthy()) {
if (clusterStats.getHealthy() != BackendHealthState.HEALTHY) {
notifyUnhealthyCluster(clusterStats);
} else {
if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void observe(List<ClusterStats> stats) {
= new HashMap<>();

for (ClusterStats stat : stats) {
if (!stat.isHealthy()) {
if (stat.getHealthy() != BackendHealthState.HEALTHY) {
// Skip if the cluster isn't healthy
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.dropwizard.views.common.View;
import io.trino.gateway.ha.clustermonitor.BackendHealthState;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.GatewayBackendManager;
import io.trino.gateway.ha.router.ResourceGroupsManager;
import io.trino.gateway.ha.router.RoutingManager;
Expand Down Expand Up @@ -43,6 +45,9 @@ public class EntityEditorResource {
@Inject
private ResourceGroupsManager resourceGroupsManager;

@Inject
private BackendStateManager backendStateManager;

@Inject
private RoutingManager routingManager;

Expand Down Expand Up @@ -75,7 +80,8 @@ public Response updateEntity(@QueryParam("entityType") String entityTypeStr,
OBJECT_MAPPER.readValue(jsonPayload, ProxyBackendConfiguration.class);
gatewayBackendManager.updateBackend(backend);
log.info("Setting up the backend {} with healthy state", backend.getName());
routingManager.upateBackEndHealth(backend.getName(), true);
routingManager.upateBackEndHealth(backend.getName(), BackendHealthState.PENDING);
backendStateManager.updateHealthState(backend.getName(), BackendHealthState.PENDING);
break;
case RESOURCE_GROUP:
ResourceGroupsDetail resourceGroupDetails = OBJECT_MAPPER.readValue(jsonPayload,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.trino.gateway.ha.router;

import io.trino.gateway.ha.clustermonitor.BackendHealthState;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.config.BackendStateConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
Expand Down Expand Up @@ -29,7 +30,7 @@ public BackendState getBackendState(ProxyBackendConfiguration backend) {
Map<String, Integer> state = new HashMap<>();
state.put("QUEUED", stats.getQueuedQueryCount());
state.put("RUNNING", stats.getRunningQueryCount());
return new BackendState(name, state);
return new BackendState(name, state, stats.getHealthy());
}

public BackendStateConfiguration getBackendStateConfiguration() {
Expand All @@ -40,14 +41,22 @@ public void updateStates(String clusterId, ClusterStats stats) {
clusterStats.put(clusterId, stats);
}

public void updateHealthState(String clusterId, BackendHealthState state) {
ClusterStats stats = clusterStats.getOrDefault(clusterId, new ClusterStats());
stats.setHealthy(state);
clusterStats.put(clusterId, stats);
}

@Data
public static class BackendState {
private final String name;
private final Map<String, Integer> state;
private final BackendHealthState healthy;

public BackendState(String name, Map<String, Integer> state) {
public BackendState(String name, Map<String, Integer> state, BackendHealthState healthy) {
this.name = name;
this.state = state;
this.healthy = healthy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.trino.gateway.ha.clustermonitor.BackendHealthState;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.proxyserver.ProxyServerConfiguration;
Expand Down Expand Up @@ -31,7 +32,7 @@ public abstract class RoutingManager {
private final LoadingCache<String, String> queryIdBackendCache;
private ExecutorService executorService = Executors.newFixedThreadPool(5);
private GatewayBackendManager gatewayBackendManager;
private ConcurrentHashMap<String, Boolean> backendToHealth;
private ConcurrentHashMap<String, BackendHealthState> backendToHealth;

public RoutingManager(GatewayBackendManager gatewayBackendManager) {
this.gatewayBackendManager = gatewayBackendManager;
Expand All @@ -47,7 +48,7 @@ public String load(String queryId) {
}
});

this.backendToHealth = new ConcurrentHashMap<String, Boolean>();
this.backendToHealth = new ConcurrentHashMap<>();
}

protected GatewayBackendManager getGatewayBackendManager() {
Expand Down Expand Up @@ -109,21 +110,11 @@ public String findBackendForQueryId(String queryId) {
return backendAddress;
}

public void upateBackEndHealth(String backendId, Boolean value) {
public void upateBackEndHealth(String backendId, BackendHealthState value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: because you're modifying the method signature we might as well fix the typo and consistency of BackEnd vs Backend: upateBackEndHealth -> updateBackendHealth

(feel free to resolve with or without the fix, i'm poking my head in here and might not see the response :D)

log.info("backend {} isHealthy {}", backendId, value);
backendToHealth.put(backendId, value);
}

public void updateBackEndHealthDB(ClusterStats stats) {
String name = stats.getClusterId();
if (stats.isHealthy()) {
gatewayBackendManager.activateBackend(name);
} else {
gatewayBackendManager.deactivateBackend(name);
}
}


/**
* This tries to find out which backend may have info about given query id. If not found returns
* the first healthy backend.
Expand Down Expand Up @@ -176,11 +167,11 @@ private boolean isBackendNotHealthy(String backendId) {
log.error("backends can not be empty");
return true;
}
Boolean isHealthy = backendToHealth.get(backendId);
BackendHealthState isHealthy = backendToHealth.get(backendId);
if (isHealthy == null) {
return true;
}
return !isHealthy;
return isHealthy != BackendHealthState.HEALTHY;
}

}
16 changes: 15 additions & 1 deletion gateway-ha/src/main/resources/template/gateway-view.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@
background-color: red;
}
.active_HEALTHY {
background-color: green;
}
.active_UNHEALTHY {
background-color: red;
}
.active_PENDING {
background-color: grey;
}
#availableClusters {
width: 75%;
border: 1px solid #ddd;
Expand Down Expand Up @@ -67,6 +79,7 @@
<th>Url</th>
<th>Group</th>
<th>Active</th>
<th>Healthy</th>
<#if backendStates?keys?size != 0>
<th>Queued<th>
<th>Running<th>
Expand All @@ -76,10 +89,11 @@
<tbody>
<#list backendConfigurations as bc>
<tr>
<td> ${bc.name}</td>
<td>${bc.name}</td>
<td><a href="${bc.externalUrl}/ui" target="_blank">${bc.externalUrl}</a></td>
<td> ${bc.routingGroup}</td>
<td class="active_${bc.active?c}"> ${bc.active?c} </td>
<td class="active_${backendStates[bc.name].healthy}">${backendStates[bc.name].healthy}</td>
<#if backendStates?keys?size != 0 && backendStates[bc.name]??>
<td>${backendStates[bc.name].state["QUEUED"]}<td>
<td>${backendStates[bc.name].state["RUNNING"]}<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class HaGatewayTestUtils {
private static final OkHttpClient httpClient = new OkHttpClient();
private static final Random RANDOM = new Random();

public static final int WAIT_FOR_BACKEND_IN_SECONDS = 65;

public static void seedRequiredData(TestConfig testConfig) {
String jdbcUrl = "jdbc:h2:" + testConfig.getH2DbFilePath();
DataStoreConfiguration db = new DataStoreConfiguration(jdbcUrl, "sa", "sa", "org.h2.Driver", 4);
Expand All @@ -40,16 +42,26 @@ public static void seedRequiredData(TestConfig testConfig) {
connectionManager.close();
}

public static void prepareMockBackend(
WireMockServer backend, String endPoint, String expectedResonse) {
backend.start();
public static void prepareMockPostBackend(
WireMockServer backend, String endPoint, String expectedResonse, int status) {
backend.stubFor(
WireMock.post(endPoint)
.willReturn(
WireMock.aResponse()
.withBody(expectedResonse)
.withHeader("Content-Encoding", "plain")
.withStatus(200)));
.withStatus(status)));
}

public static void prepareMockGetBackend(
WireMockServer backend, String endPoint, String response, int status) {
backend.stubFor(
WireMock.get(endPoint)
.willReturn(
WireMock.aResponse()
.withBody(response)
.withHeader("Content-Encoding", "plain")
.withStatus(status)));
}

public static TestConfig buildGatewayConfigAndSeedDb(int routerPort, String configFile)
Expand Down
Loading
Loading