Skip to content

Commit

Permalink
Refactor cluster stats monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 1, 2024
1 parent fc9aa8f commit 258e7ff
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ public class ClusterStatsHttpMonitor
private static final Logger log = LoggerFactory.getLogger(ClusterStatsHttpMonitor.class);
private static final String SESSION_USER = "sessionUser";

private final BackendStateConfiguration backendStateConfiguration;
private final String username;
private final String password;

public ClusterStatsHttpMonitor(BackendStateConfiguration backendStateConfiguration)
{
this.backendStateConfiguration = backendStateConfiguration;
username = backendStateConfiguration.getUsername();
password = backendStateConfiguration.getPassword();
}

@Override
Expand Down Expand Up @@ -111,8 +113,8 @@ private OkHttpClient acquireClientWithCookie(String loginUrl)
UiApiCookieJar cookieJar = new UiApiCookieJar();
OkHttpClient client = new OkHttpClient.Builder().cookieJar(cookieJar).build();
RequestBody formBody = new FormBody.Builder()
.add("username", backendStateConfiguration.getUsername())
.add("password", backendStateConfiguration.getPassword())
.add("username", username)
.add("password", password)
.build();
Request loginRequest = new Request.Builder()
.url(HttpUrl.parse(loginUrl))
Expand Down Expand Up @@ -155,7 +157,7 @@ private String queryCluster(ProxyBackendConfiguration backend, String path)
case HttpStatus.UNAUTHORIZED_401:
log.info("Unauthorized to fetch cluster stats");
log.debug("username: {}, targetUrl: {}, cookieStore: {}",
backendStateConfiguration.getUsername(),
username,
targetUrl,
client.cookieJar().loadForRequest(HttpUrl.parse(targetUrl)));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.util.concurrent.SimpleTimeLimiter;
import io.trino.gateway.ha.config.BackendStateConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,10 +36,8 @@ public class ClusterStatsJdbcMonitor
implements ClusterStatsMonitor
{
private static final Logger log = LoggerFactory.getLogger(ClusterStatsJdbcMonitor.class);
@Nullable
private final Properties properties;

private final BackendStateConfiguration backendStateConfiguration;
private final Properties properties; // TODO Avoid using a mutable field

private static final String STATE_QUERY = "SELECT state, COUNT(*) as count "
+ "FROM runtime.queries "
Expand All @@ -49,20 +46,11 @@ public class ClusterStatsJdbcMonitor

public ClusterStatsJdbcMonitor(BackendStateConfiguration backendStateConfiguration)
{
this.backendStateConfiguration = backendStateConfiguration;
if (backendStateConfiguration != null) {
properties = new Properties();
properties.setProperty("user", backendStateConfiguration.getUsername());
if (backendStateConfiguration.getPassword() != null) {
properties.setProperty("password", backendStateConfiguration.getPassword());
}
properties.setProperty("SSL", String.valueOf(backendStateConfiguration.getSsl()));
log.info("state check configured");
}
else {
log.warn("no state check configured");
properties = null;
}
properties = new Properties();
properties.setProperty("user", backendStateConfiguration.getUsername());
properties.setProperty("password", backendStateConfiguration.getPassword());
properties.setProperty("SSL", String.valueOf(backendStateConfiguration.getSsl()));
log.info("state check configured");
}

@Override
Expand All @@ -72,9 +60,6 @@ public ClusterStats monitor(ProxyBackendConfiguration backend)
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());
String jdbcUrl;
if (backendStateConfiguration == null) {
return clusterStats;
}
try {
URL parsedUrl = new URL(url);
jdbcUrl = String
Expand All @@ -86,13 +71,13 @@ public ClusterStats monitor(ProxyBackendConfiguration backend)
}
catch (MalformedURLException e) {
log.error("could not parse backend url {} ", url);
return clusterStats;
return clusterStats; // TODO Invalid configuration should fail
}

try (Connection conn = DriverManager.getConnection(jdbcUrl, properties)) {
PreparedStatement stmt = SimpleTimeLimiter.create(Executors.newSingleThreadExecutor())
.callWithTimeout(() -> conn.prepareStatement(STATE_QUERY), 10, TimeUnit.SECONDS);
stmt.setString(1, backendStateConfiguration.getUsername());
stmt.setString(1, (String) properties.get("user"));
Map<String, Integer> partialState = new HashMap<>();
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import io.trino.gateway.ha.config.ProxyBackendConfiguration;

public class NoopClusterStatsMonitor
implements ClusterStatsMonitor
{
@Override
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());
return clusterStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor;
import io.trino.gateway.ha.config.ClusterStatsConfiguration;
import io.trino.gateway.ha.config.HaGatewayConfiguration;

Expand All @@ -39,6 +40,9 @@ public ClusterStatsMonitorModule(HaGatewayConfiguration config, Environment env)
public ClusterStatsMonitor getClusterStatsMonitor()
{
ClusterStatsConfiguration clusterStatsConfig = config.getClusterStatsConfiguration();
if (config.getBackendState() == null) {
return new NoopClusterStatsMonitor();
}
if (clusterStatsConfig.isUseApi()) {
return new ClusterStatsHttpMonitor(config.getBackendState());
}
Expand Down

0 comments on commit 258e7ff

Please sign in to comment.