From 009d9c841daf7bf65e7724b39d144fc298c0d92d Mon Sep 17 00:00:00 2001 From: Choi Wai Yiu Date: Wed, 31 Jul 2024 08:51:40 +0900 Subject: [PATCH] Export clusterstats to jmx --- .../io/trino/gateway/baseapp/BaseApp.java | 2 + .../clustermonitor/ClusterStatsObserver.java | 10 +- .../ha/module/ClusterStateListenerModule.java | 6 +- .../ha/router/BackendStateMBeanExporter.java | 130 ++++++++++++++++++ .../ha/router/BackendStateManager.java | 5 + 5 files changed, 149 insertions(+), 4 deletions(-) create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateMBeanExporter.java diff --git a/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java b/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java index 250e7bc5d..1c1a52788 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java +++ b/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java @@ -32,6 +32,7 @@ import io.trino.gateway.ha.resource.LoginResource; import io.trino.gateway.ha.resource.PublicResource; import io.trino.gateway.ha.resource.TrinoResource; +import io.trino.gateway.ha.router.BackendStateMBeanExporter; import io.trino.gateway.ha.security.AuthorizedExceptionMapper; import io.trino.gateway.proxyserver.ForProxy; import io.trino.gateway.proxyserver.ProxyRequestHandler; @@ -125,6 +126,7 @@ public void configure(Binder binder) jaxrsBinder(binder).bind(AuthorizedExceptionMapper.class); binder.bind(ProxyHandlerStats.class).in(Scopes.SINGLETON); newExporter(binder).export(ProxyHandlerStats.class).withGeneratedName(); + binder.bind(BackendStateMBeanExporter.class).in(Scopes.SINGLETON); } private static void addManagedApps(HaGatewayConfiguration configuration, Binder binder) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsObserver.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsObserver.java index 6729b1e91..3e2edbe45 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsObserver.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsObserver.java @@ -13,18 +13,23 @@ */ package io.trino.gateway.ha.clustermonitor; +import io.trino.gateway.ha.router.BackendStateMBeanExporter; import io.trino.gateway.ha.router.BackendStateManager; import java.util.List; +import static java.util.Objects.requireNonNull; + public class ClusterStatsObserver implements TrinoClusterStatsObserver { private final BackendStateManager backendStateManager; + private final BackendStateMBeanExporter backendStateMBeanExporter; - public ClusterStatsObserver(BackendStateManager backendStateManager) + public ClusterStatsObserver(BackendStateManager backendStateManager, BackendStateMBeanExporter backendStateMBeanExporter) { - this.backendStateManager = backendStateManager; + this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null"); + this.backendStateMBeanExporter = requireNonNull(backendStateMBeanExporter, "backendStateMBeanExporter is null"); } @Override @@ -33,5 +38,6 @@ public void observe(List clustersStats) for (ClusterStats clusterStats : clustersStats) { backendStateManager.updateStates(clusterStats.clusterId(), clusterStats); } + backendStateMBeanExporter.updateExport(); } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStateListenerModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStateListenerModule.java index 7ddf9c7fb..fb942798b 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStateListenerModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStateListenerModule.java @@ -23,6 +23,7 @@ import io.trino.gateway.ha.clustermonitor.TrinoClusterStatsObserver; import io.trino.gateway.ha.config.HaGatewayConfiguration; import io.trino.gateway.ha.config.MonitorConfiguration; +import io.trino.gateway.ha.router.BackendStateMBeanExporter; import io.trino.gateway.ha.router.BackendStateManager; import io.trino.gateway.ha.router.RoutingManager; @@ -46,11 +47,12 @@ public ClusterStateListenerModule(HaGatewayConfiguration config) @Singleton public List getClusterStatsObservers( RoutingManager mgr, - BackendStateManager backendStateManager) + BackendStateManager backendStateManager, + BackendStateMBeanExporter backendStateMBeanExporter) { return ImmutableList.builder() .add(new HealthCheckObserver(mgr)) - .add(new ClusterStatsObserver(backendStateManager)) + .add(new ClusterStatsObserver(backendStateManager, backendStateMBeanExporter)) .build(); } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateMBeanExporter.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateMBeanExporter.java new file mode 100644 index 000000000..69b063603 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateMBeanExporter.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import io.trino.gateway.ha.clustermonitor.ClusterStats; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.weakref.jmx.MBeanExport; +import org.weakref.jmx.MBeanExporter; +import org.weakref.jmx.Managed; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class BackendStateMBeanExporter +{ + @GuardedBy("this") + private final List mbeanExports = new ArrayList<>(); + + private final MBeanExporter exporter; + private final BackendStateManager backendStateManager; + private final Map backendStates = new HashMap<>(); + + @Inject + public BackendStateMBeanExporter(MBeanExporter exporter, BackendStateManager backendStateManager) + { + this.exporter = requireNonNull(exporter, "exporter is null"); + this.backendStateManager = requireNonNull(backendStateManager, "backendStateManager is null"); + } + + @PostConstruct + public synchronized void updateExport() + { + for (ClusterStats clusterStats : backendStateManager.getAllBackendStates().values()) { + String clusterId = clusterStats.clusterId(); + + if (backendStates.containsKey(clusterId)) { + ClusterStatsJMX clusterStatsJMX = backendStates.get(clusterId); + clusterStatsJMX.setFrom(clusterStats); + } + else { + ClusterStatsJMX clusterStatsJMX = new ClusterStatsJMX(); + clusterStatsJMX.setFrom(clusterStats); + mbeanExports.add(exporter.exportWithGeneratedName( + clusterStatsJMX, + ClusterStatsJMX.class, + ImmutableMap.builder() + .put("name", "ClusterStats") + .put("backend", clusterId) + .build())); + backendStates.put(clusterId, clusterStatsJMX); + } + } + } + + @PreDestroy + public synchronized void unexport() + { + for (MBeanExport mbeanExport : mbeanExports) { + mbeanExport.unexport(); + } + mbeanExports.clear(); + } + + public static class ClusterStatsJMX + { + private int numWorkerNodes; + private boolean healthy; + private String proxyTo; + private String externalUrl; + private String routingGroup; + + public void setFrom(ClusterStats clusterStats) + { + numWorkerNodes = clusterStats.numWorkerNodes(); + healthy = clusterStats.healthy(); + proxyTo = clusterStats.proxyTo(); + externalUrl = clusterStats.externalUrl(); + routingGroup = clusterStats.routingGroup(); + } + + @Managed + public int getNumWorkerNodes() + { + return numWorkerNodes; + } + + @Managed + public boolean isHealthy() + { + return healthy; + } + + @Managed + public String getProxyTo() + { + return proxyTo; + } + + @Managed + public String getExternalUrl() + { + return externalUrl; + } + + @Managed + public String getRoutingGroup() + { + return routingGroup; + } + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java index 8fa7de46e..84c07bed8 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java @@ -34,6 +34,11 @@ public ClusterStats getBackendState(ProxyBackendConfiguration backend) return clusterStats.getOrDefault(name, ClusterStats.builder(name).build()); } + public Map getAllBackendStates() + { + return clusterStats; + } + public void updateStates(String clusterId, ClusterStats stats) { clusterStats.put(clusterId, stats);