Skip to content

Commit

Permalink
changes for node perf tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Oct 6, 2023
1 parent 47032e5 commit df6c065
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 2 deletions.
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.ratelimiting.tracker.NodePerformanceTracker;
import org.opensearch.repositories.RepositoriesModule;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
Expand Down Expand Up @@ -1118,6 +1119,12 @@ protected Node(
);
resourcesToClose.add(persistentTasksClusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
final NodePerformanceTracker nodePerformanceTracker = new NodePerformanceTracker(
threadPool,
settings,
clusterService.getClusterSettings(),
monitorService.fsService()
);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
Expand Down Expand Up @@ -1206,6 +1213,7 @@ protected Node(
b.bind(SearchRequestStats.class).toInstance(searchRequestStats);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker);
});
injector = modules.createInjector();

Expand Down Expand Up @@ -1315,6 +1323,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(NodePerformanceTracker.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
Expand Down Expand Up @@ -1482,6 +1491,7 @@ private Node stop() {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
injector.getInstance(NodePerformanceTracker.class).stop();
nodeService.getTaskCancellationMonitoringService().stop();

pluginLifecycleComponents.forEach(LifecycleComponent::stop);
Expand Down Expand Up @@ -1547,6 +1557,8 @@ public synchronized void close() throws IOException {
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(nodeService.getTaskCancellationMonitoringService());
toClose.add(() -> stopWatch.stop().start("node_performance_tracker"));
toClose.add(injector.getInstance(NodePerformanceTracker.class));

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,96 @@
package org.opensearch.ratelimiting.tracker;public class NodePerformanceTracker {
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimiting.tracker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* This tracks the performance of node resources such as CPU, IO and memory
*/
public class NodePerformanceTracker extends AbstractLifecycleComponent {
private ThreadPool threadPool;
private final ClusterSettings clusterSettings;
private AverageIOUsageTracker ioUsageTracker;

private AverageDiskStats averageDiskStats;
private PerformanceTrackerSettings performanceTrackerSettings;
private FsService fsService;
private volatile Scheduler.Cancellable scheduledFuture;
private static final Logger logger = LogManager.getLogger(NodePerformanceTracker.class);

public NodePerformanceTracker(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings, FsService fsService) {
this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
this.performanceTrackerSettings = new PerformanceTrackerSettings(settings, clusterSettings);
this.fsService = fsService;
initialize();
}

void initialize() {
ioUsageTracker = new AverageIOUsageTracker(
threadPool,
new TimeValue(1, TimeUnit.SECONDS),
new TimeValue(60, TimeUnit.SECONDS),
clusterSettings,
fsService
);
}

private AverageDiskStats getAverageIOUsed() {
return ioUsageTracker.getAverageDiskStats();
}

private void setAverageDiskStats(AverageDiskStats averageDiskStats) {
this.averageDiskStats = averageDiskStats;
}

private AverageDiskStats getAverageDiskStats() {
return averageDiskStats;
}

private void doRun() {
setAverageDiskStats(getAverageIOUsed());
logger.info(getAverageDiskStats().toString());
}
@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
doRun();
} catch (Exception e) {

}
}, new TimeValue(1, TimeUnit.SECONDS), ThreadPool.Names.GENERIC);
ioUsageTracker.doStart();
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
ioUsageTracker.doStop();
}

@Override
protected void doClose() throws IOException {
ioUsageTracker.doClose();
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,101 @@
package org.opensearch.ratelimiting.tracker;public class PerformanceTrackerSettings {
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimiting.tracker;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

/**
* Settings related to node performance trackers such as polling interval, window duration etc
*/
public class PerformanceTrackerSettings {

private static class Defaults {
/**
* This is the default polling interval of usage trackers to get the resource utilization data
*/
private static final long POLLING_INTERVAL_IN_MILLIS = 500;
/**
* This is the default window duration on which the average resource utilization values will be calculated
*/
private static final long WINDOW_DURATION_IN_SECONDS = 30;
}

public static final Setting<TimeValue> GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting(
"node.perf_tracker.global_cpu_usage.polling_interval",
TimeValue.timeValueMillis(Defaults.POLLING_INTERVAL_IN_MILLIS),
Setting.Property.NodeScope
);
public static final Setting<TimeValue> GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting(
"node.perf_tracker.global_cpu_usage.window_duration",
TimeValue.timeValueSeconds(Defaults.WINDOW_DURATION_IN_SECONDS),
//Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting(
"node.perf_tracker.global_jvmmp.polling_interval",
TimeValue.timeValueMillis(Defaults.POLLING_INTERVAL_IN_MILLIS),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting(
"node.perf_tracker.global_jvmmp.window_duration",
TimeValue.timeValueSeconds(Defaults.WINDOW_DURATION_IN_SECONDS),
//Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private volatile TimeValue cpuWindowDuration;
private volatile TimeValue cpuPollingInterval;
private volatile TimeValue memoryWindowDuration;
private volatile TimeValue memoryPollingInterval;

public PerformanceTrackerSettings(Settings settings, ClusterSettings clusterSettings) {
this.cpuPollingInterval = GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings);
this.cpuWindowDuration = GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings);
this.memoryPollingInterval = GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings);
this.memoryWindowDuration = GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, this::setCpuWindowDuration);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, this::setMemoryWindowDuration);
}

public TimeValue getCpuWindowDuration() {
return this.cpuWindowDuration;
}

public TimeValue getCpuPollingInterval() {
return cpuPollingInterval;
}

public TimeValue getMemoryPollingInterval() {
return memoryPollingInterval;
}

public TimeValue getMemoryWindowDuration() {
return memoryWindowDuration;
}

public void setCpuWindowDuration(TimeValue cpuWindowDuration) {
this.cpuWindowDuration = cpuWindowDuration;
}

public void setMemoryWindowDuration(TimeValue memoryWindowDuration) {
this.memoryWindowDuration = memoryWindowDuration;
}
}







0 comments on commit df6c065

Please sign in to comment.