Skip to content

Commit

Permalink
Export clusterstats to jmx
Browse files Browse the repository at this point in the history
  • Loading branch information
wchoi authored and choiwaiyiu committed Oct 2, 2024
1 parent 897118c commit 7f4af7e
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.gateway.ha.resource.LoginResource;
import io.trino.gateway.ha.resource.PublicResource;
import io.trino.gateway.ha.resource.TrinoResource;
import io.trino.gateway.ha.router.BackendStateMBeanExporter;
import io.trino.gateway.ha.security.AuthorizedExceptionMapper;
import io.trino.gateway.proxyserver.ForProxy;
import io.trino.gateway.proxyserver.ProxyRequestHandler;
Expand Down Expand Up @@ -125,6 +126,7 @@ public void configure(Binder binder)
jaxrsBinder(binder).bind(AuthorizedExceptionMapper.class);
binder.bind(ProxyHandlerStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(ProxyHandlerStats.class).withGeneratedName();
binder.bind(BackendStateMBeanExporter.class).in(Scopes.SINGLETON);
}

private static void addManagedApps(HaGatewayConfiguration configuration, Binder binder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
*/
package io.trino.gateway.ha.clustermonitor;

import io.trino.gateway.ha.router.BackendStateMBeanExporter;
import io.trino.gateway.ha.router.BackendStateManager;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class ClusterStatsObserver
implements TrinoClusterStatsObserver
{
private final BackendStateManager backendStateManager;
private final BackendStateMBeanExporter backendStateMBeanExporter;

public ClusterStatsObserver(BackendStateManager backendStateManager)
public ClusterStatsObserver(BackendStateManager backendStateManager, BackendStateMBeanExporter backendStateMBeanExporter)
{
this.backendStateManager = backendStateManager;
this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null");
this.backendStateMBeanExporter = requireNonNull(backendStateMBeanExporter, "backendStateMBeanExporter is null");
}

@Override
Expand All @@ -33,5 +38,6 @@ public void observe(List<ClusterStats> clustersStats)
for (ClusterStats clusterStats : clustersStats) {
backendStateManager.updateStates(clusterStats.clusterId(), clusterStats);
}
backendStateMBeanExporter.updateExport();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.gateway.ha.clustermonitor.TrinoClusterStatsObserver;
import io.trino.gateway.ha.config.HaGatewayConfiguration;
import io.trino.gateway.ha.config.MonitorConfiguration;
import io.trino.gateway.ha.router.BackendStateMBeanExporter;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.RoutingManager;

Expand All @@ -46,11 +47,12 @@ public ClusterStateListenerModule(HaGatewayConfiguration config)
@Singleton
public List<TrinoClusterStatsObserver> getClusterStatsObservers(
RoutingManager mgr,
BackendStateManager backendStateManager)
BackendStateManager backendStateManager,
BackendStateMBeanExporter backendStateMBeanExporter)
{
return ImmutableList.<TrinoClusterStatsObserver>builder()
.add(new HealthCheckObserver(mgr))
.add(new ClusterStatsObserver(backendStateManager))
.add(new ClusterStatsObserver(backendStateManager, backendStateMBeanExporter))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.router;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.weakref.jmx.MBeanExport;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public class BackendStateMBeanExporter
{
@GuardedBy("this")
private final List<MBeanExport> mbeanExports = new ArrayList<>();

private final MBeanExporter exporter;
private final BackendStateManager backendStateManager;
private final Map<String, ClusterStatsJMX> backendStates = new HashMap<>();

@Inject
public BackendStateMBeanExporter(MBeanExporter exporter, BackendStateManager backendStateManager)
{
this.exporter = requireNonNull(exporter, "exporter is null");
this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null");
}

@PostConstruct
public synchronized void updateExport()
{
for (ClusterStats clusterStats : backendStateManager.getAllBackendStates().values()) {
String clusterId = clusterStats.clusterId();

if (backendStates.containsKey(clusterId)) {
ClusterStatsJMX clusterStatsJMX = backendStates.get(clusterId);
clusterStatsJMX.setFrom(clusterStats);
}
else {
ClusterStatsJMX clusterStatsJMX = new ClusterStatsJMX();
clusterStatsJMX.setFrom(clusterStats);
mbeanExports.add(exporter.exportWithGeneratedName(
clusterStatsJMX,
ClusterStatsJMX.class,
ImmutableMap.<String, String>builder()
.put("name", "ClusterStats")
.put("backend", clusterId)
.build()));
backendStates.put(clusterId, clusterStatsJMX);
}
}
}

@PreDestroy
public synchronized void unexport()
{
for (MBeanExport mbeanExport : mbeanExports) {
mbeanExport.unexport();
}
mbeanExports.clear();
}

public static class ClusterStatsJMX
{
private int numWorkerNodes;
private boolean healthy;
private String proxyTo;
private String externalUrl;
private String routingGroup;

public void setFrom(ClusterStats clusterStats)
{
numWorkerNodes = clusterStats.numWorkerNodes();
healthy = clusterStats.healthy();
proxyTo = clusterStats.proxyTo();
externalUrl = clusterStats.externalUrl();
routingGroup = clusterStats.routingGroup();
}

@Managed
public int getNumWorkerNodes()
{
return numWorkerNodes;
}

@Managed
public boolean isHealthy()
{
return healthy;
}

@Managed
public String getProxyTo()
{
return proxyTo;
}

@Managed
public String getExternalUrl()
{
return externalUrl;
}

@Managed
public String getRoutingGroup()
{
return routingGroup;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public ClusterStats getBackendState(ProxyBackendConfiguration backend)
return clusterStats.getOrDefault(name, ClusterStats.builder(name).build());
}

public Map<String, ClusterStats> getAllBackendStates()
{
return clusterStats;
}

public void updateStates(String clusterId, ClusterStats stats)
{
clusterStats.put(clusterId, stats);
Expand Down

0 comments on commit 7f4af7e

Please sign in to comment.