From df6c065edc87c4316d5051772c5bc955d9bc6477 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 5 Oct 2023 17:21:01 +0530 Subject: [PATCH] changes for node perf tracker Signed-off-by: Bharathwaj G --- .../main/java/org/opensearch/node/Node.java | 12 +++ .../tracker/NodePerformanceTracker.java | 96 ++++++++++++++++- .../tracker/PerformanceTrackerSettings.java | 101 +++++++++++++++++- 3 files changed, 207 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 729b38ca27394..0062ffe91c44d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -195,6 +195,7 @@ import org.opensearch.plugins.SearchPlugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.ratelimiting.tracker.NodePerformanceTracker; import org.opensearch.repositories.RepositoriesModule; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; @@ -1118,6 +1119,12 @@ protected Node( ); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + final NodePerformanceTracker nodePerformanceTracker = new NodePerformanceTracker( + threadPool, + settings, + clusterService.getClusterSettings(), + monitorService.fsService() + ); modules.add(b -> { b.bind(Node.class).toInstance(this); @@ -1206,6 +1213,7 @@ protected Node( b.bind(SearchRequestStats.class).toInstance(searchRequestStats); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); + b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker); }); injector = modules.createInjector(); @@ -1315,6 +1323,7 @@ public Node start() throws NodeValidationException { injector.getInstance(RepositoriesService.class).start(); injector.getInstance(SearchService.class).start(); injector.getInstance(FsHealthService.class).start(); + injector.getInstance(NodePerformanceTracker.class).start(); nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); @@ -1482,6 +1491,7 @@ private Node stop() { injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); + injector.getInstance(NodePerformanceTracker.class).stop(); nodeService.getTaskCancellationMonitoringService().stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); @@ -1547,6 +1557,8 @@ public synchronized void close() throws IOException { toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); toClose.add(nodeService.getTaskCancellationMonitoringService()); + toClose.add(() -> stopWatch.stop().start("node_performance_tracker")); + toClose.add(injector.getInstance(NodePerformanceTracker.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); diff --git a/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java b/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java index 0c29c7dba7dba..93c393b04ed35 100644 --- a/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java +++ b/server/src/main/java/org/opensearch/ratelimiting/tracker/NodePerformanceTracker.java @@ -1,2 +1,96 @@ -package org.opensearch.ratelimiting.tracker;public class NodePerformanceTracker { +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimiting.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * This tracks the performance of node resources such as CPU, IO and memory + */ +public class NodePerformanceTracker extends AbstractLifecycleComponent { + private ThreadPool threadPool; + private final ClusterSettings clusterSettings; + private AverageIOUsageTracker ioUsageTracker; + + private AverageDiskStats averageDiskStats; + private PerformanceTrackerSettings performanceTrackerSettings; + private FsService fsService; + private volatile Scheduler.Cancellable scheduledFuture; + private static final Logger logger = LogManager.getLogger(NodePerformanceTracker.class); + + public NodePerformanceTracker(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings, FsService fsService) { + this.threadPool = threadPool; + this.clusterSettings = clusterSettings; + this.performanceTrackerSettings = new PerformanceTrackerSettings(settings, clusterSettings); + this.fsService = fsService; + initialize(); + } + + void initialize() { + ioUsageTracker = new AverageIOUsageTracker( + threadPool, + new TimeValue(1, TimeUnit.SECONDS), + new TimeValue(60, TimeUnit.SECONDS), + clusterSettings, + fsService + ); + } + + private AverageDiskStats getAverageIOUsed() { + return ioUsageTracker.getAverageDiskStats(); + } + + private void setAverageDiskStats(AverageDiskStats averageDiskStats) { + this.averageDiskStats = averageDiskStats; + } + + private AverageDiskStats getAverageDiskStats() { + return averageDiskStats; + } + + private void doRun() { + setAverageDiskStats(getAverageIOUsed()); + logger.info(getAverageDiskStats().toString()); + } + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + try { + doRun(); + } catch (Exception e) { + + } + }, new TimeValue(1, TimeUnit.SECONDS), ThreadPool.Names.GENERIC); + ioUsageTracker.doStart(); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + ioUsageTracker.doStop(); + } + + @Override + protected void doClose() throws IOException { + ioUsageTracker.doClose(); + } } diff --git a/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java b/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java index f79c17dade75d..62ba3061153e7 100644 --- a/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimiting/tracker/PerformanceTrackerSettings.java @@ -1,2 +1,101 @@ -package org.opensearch.ratelimiting.tracker;public class PerformanceTrackerSettings { +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimiting.tracker; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +/** + * Settings related to node performance trackers such as polling interval, window duration etc + */ +public class PerformanceTrackerSettings { + + private static class Defaults { + /** + * This is the default polling interval of usage trackers to get the resource utilization data + */ + private static final long POLLING_INTERVAL_IN_MILLIS = 500; + /** + * This is the default window duration on which the average resource utilization values will be calculated + */ + private static final long WINDOW_DURATION_IN_SECONDS = 30; + } + + public static final Setting GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.perf_tracker.global_cpu_usage.polling_interval", + TimeValue.timeValueMillis(Defaults.POLLING_INTERVAL_IN_MILLIS), + Setting.Property.NodeScope + ); + public static final Setting GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.perf_tracker.global_cpu_usage.window_duration", + TimeValue.timeValueSeconds(Defaults.WINDOW_DURATION_IN_SECONDS), + //Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.perf_tracker.global_jvmmp.polling_interval", + TimeValue.timeValueMillis(Defaults.POLLING_INTERVAL_IN_MILLIS), + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.perf_tracker.global_jvmmp.window_duration", + TimeValue.timeValueSeconds(Defaults.WINDOW_DURATION_IN_SECONDS), + //Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private volatile TimeValue cpuWindowDuration; + private volatile TimeValue cpuPollingInterval; + private volatile TimeValue memoryWindowDuration; + private volatile TimeValue memoryPollingInterval; + + public PerformanceTrackerSettings(Settings settings, ClusterSettings clusterSettings) { + this.cpuPollingInterval = GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); + this.cpuWindowDuration = GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); + this.memoryPollingInterval = GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); + this.memoryWindowDuration = GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); + + clusterSettings.addSettingsUpdateConsumer(GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, this::setCpuWindowDuration); + clusterSettings.addSettingsUpdateConsumer(GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, this::setMemoryWindowDuration); + } + + public TimeValue getCpuWindowDuration() { + return this.cpuWindowDuration; + } + + public TimeValue getCpuPollingInterval() { + return cpuPollingInterval; + } + + public TimeValue getMemoryPollingInterval() { + return memoryPollingInterval; + } + + public TimeValue getMemoryWindowDuration() { + return memoryWindowDuration; + } + + public void setCpuWindowDuration(TimeValue cpuWindowDuration) { + this.cpuWindowDuration = cpuWindowDuration; + } + + public void setMemoryWindowDuration(TimeValue memoryWindowDuration) { + this.memoryWindowDuration = memoryWindowDuration; + } } + + + + + + +