Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Oct 6, 2023
1 parent e7b0c49 commit 525ed11
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.GlobalPerformanceStats;
import org.opensearch.node.NodesPerformanceStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -144,7 +144,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
private SearchPipelineStats searchPipelineStats;

@Nullable
private GlobalPerformanceStats globalPerformanceStats;
private NodesPerformanceStats nodesPerformanceStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -202,10 +202,10 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.11 when we backport
globalPerformanceStats = in.readOptionalWriteable(GlobalPerformanceStats::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
nodesPerformanceStats = in.readOptionalWriteable(NodesPerformanceStats::new);
} else {
globalPerformanceStats = null;
nodesPerformanceStats = null;
}
}

Expand All @@ -225,7 +225,7 @@ public NodeStats(
@Nullable DiscoveryStats discoveryStats,
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable GlobalPerformanceStats globalPerformanceStats,
@Nullable NodesPerformanceStats nodesPerformanceStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
Expand All @@ -251,7 +251,7 @@ public NodeStats(
this.discoveryStats = discoveryStats;
this.ingestStats = ingestStats;
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.globalPerformanceStats = globalPerformanceStats;
this.nodesPerformanceStats = nodesPerformanceStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
Expand Down Expand Up @@ -356,8 +356,8 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
}

@Nullable
public GlobalPerformanceStats getNodesPerformanceStats() {
return globalPerformanceStats;
public NodesPerformanceStats getNodesPerformanceStats() {
return nodesPerformanceStats;
}

@Nullable
Expand Down Expand Up @@ -446,8 +446,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO : make it 2.11 when we backport
out.writeOptionalWriteable(globalPerformanceStats);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
out.writeOptionalWriteable(nodesPerformanceStats);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public enum Metric {
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
GLOBAL_PERFORMANCE_STATS("performance_stats");
PERFORMANCE_STATS("performance_stats");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.GLOBAL_PERFORMANCE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.PERFORMANCE_STATS.containedIn(metrics)
);
}

Expand Down
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ protected Node(
settings,
clusterService.getClusterSettings()
);
final PerfStatsCollectorService perfStatsCollectorService = new PerfStatsCollectorService(
final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(
nodePerformanceTracker,
clusterService,
threadPool
Expand All @@ -1102,7 +1102,7 @@ protected Node(
searchPipelineService,
fileCache,
taskCancellationMonitoringService,
perfStatsCollectorService
performanceCollectorService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1223,8 +1223,8 @@ protected Node(
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(PerfStatsCollectorService.class).toInstance(perfStatsCollectorService);
b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker);
b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
Expand Down Expand Up @@ -1342,7 +1342,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(NodePerformanceTracker.class).start();
injector.getInstance(PerfStatsCollectorService.class).start();
injector.getInstance(PerformanceCollectorService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
Expand Down Expand Up @@ -1506,7 +1506,7 @@ private Node stop() {
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodePerformanceTracker.class).stop();
injector.getInstance(PerfStatsCollectorService.class).stop();
injector.getInstance(PerformanceCollectorService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down Expand Up @@ -1573,7 +1573,7 @@ public synchronized void close() throws IOException {
toClose.add(() -> stopWatch.stop().start("node_performance_tracker"));
toClose.add(injector.getInstance(NodePerformanceTracker.class));
toClose.add(() -> stopWatch.stop().start("perf_stats_collector"));
toClose.add(injector.getInstance(PerfStatsCollectorService.class));
toClose.add(injector.getInstance(PerformanceCollectorService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@
* This represents the performance stats of a node along with the timestamp at which the stats object was created
* in the respective node
*/
public class NodePerformanceStatistics implements Writeable {
public class NodePerformanceStats implements Writeable {
final String nodeId;
long timestamp;
double cpuUtilizationPercent;
double memoryUtilizationPercent;

public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
public NodePerformanceStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
this.nodeId = nodeId;
this.cpuUtilizationPercent = cpuUtilizationPercent;
this.memoryUtilizationPercent = memoryUtilizationPercent;
this.timestamp = timestamp;
}

public NodePerformanceStatistics(StreamInput in) throws IOException {
public NodePerformanceStats(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.timestamp = in.readLong();
this.cpuUtilizationPercent = in.readDouble();
Expand All @@ -58,12 +58,10 @@ public String toString() {
return sb.toString();
}

NodePerformanceStatistics(NodePerformanceStatistics nodePerformanceStatistics) {
NodePerformanceStats(NodePerformanceStats nodePerformanceStats) {
this(
nodePerformanceStatistics.nodeId,
nodePerformanceStatistics.cpuUtilizationPercent,
nodePerformanceStatistics.memoryUtilizationPercent,
nodePerformanceStatistics.timestamp
nodePerformanceStats.nodeId,
nodePerformanceStats.timestamp, nodePerformanceStats.memoryUtilizationPercent, nodePerformanceStats.cpuUtilizationPercent
);
}

Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class NodeService implements Closeable {
private final ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
private final ResponseCollectorService responseCollectorService;
private final PerfStatsCollectorService perfStatsCollectorService;
private final PerformanceCollectorService performanceCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class NodeService implements Closeable {
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService,
PerfStatsCollectorService perfStatsCollectorService
PerformanceCollectorService performanceCollectorService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -139,7 +139,7 @@ public class NodeService implements Closeable {
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.perfStatsCollectorService = perfStatsCollectorService;
this.performanceCollectorService = performanceCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -241,7 +241,7 @@ public NodeStats stats(
discoveryStats ? discovery.stats() : null,
ingest ? ingestService.stats() : null,
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
nodesPerfStats ? perfStatsCollectorService.stats() : null,
nodesPerfStats ? performanceCollectorService.stats() : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
* This class represents performance stats such as CPU, Memory and IO resource usage of each node along with the time
* elapsed from when the stats were recorded.
*/
public class GlobalPerformanceStats implements Writeable, ToXContentFragment {
public class NodesPerformanceStats implements Writeable, ToXContentFragment {

// Map of node id to perf stats of the corresponding node.
private final Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap;
private final Map<String, NodePerformanceStats> nodeIdToPerfStatsMap;

public GlobalPerformanceStats(Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap) {
public NodesPerformanceStats(Map<String, NodePerformanceStats> nodeIdToPerfStatsMap) {
this.nodeIdToPerfStatsMap = nodeIdToPerfStatsMap;
}

public GlobalPerformanceStats(StreamInput in) throws IOException {
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStatistics::new);
public NodesPerformanceStats(StreamInput in) throws IOException {
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStats::new);
}

@Override
Expand All @@ -45,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException {
/**
* Returns map of node id to perf stats of the corresponding node.
*/
public Map<String, NodePerformanceStatistics> getNodeIdToNodePerfStatsMap() {
public Map<String, NodePerformanceStats> getNodeIdToNodePerfStatsMap() {
return nodeIdToPerfStatsMap;
}

Expand All @@ -54,7 +54,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject("performance_stats");
for (String nodeId : nodeIdToPerfStatsMap.keySet()) {
builder.startObject(nodeId);
NodePerformanceStatistics perfStats = nodeIdToPerfStatsMap.get(nodeId);
NodePerformanceStats perfStats = nodeIdToPerfStatsMap.get(nodeId);
if (perfStats != null) {
builder.field(
"elapsed_time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -31,24 +30,28 @@
* This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for
* coordinator node to aid in throttling, ranking etc
*/
public class PerfStatsCollectorService extends AbstractLifecycleComponent implements ClusterStateListener {
public class PerformanceCollectorService extends AbstractLifecycleComponent implements ClusterStateListener {

/**
* This refresh interval denotes the polling interval of PerfStatsCollectorService to refresh the performance stats
* This refresh interval denotes the polling interval of PerformanceCollectorService to refresh the performance stats
* from local node
*/
private static long REFRESH_INTERVAL_IN_MILLIS = 1000;

private static final Logger logger = LogManager.getLogger(PerfStatsCollectorService.class);
private final ConcurrentMap<String, NodePerformanceStatistics> nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap();
private static final Logger logger = LogManager.getLogger(PerformanceCollectorService.class);
private final ConcurrentMap<String, NodePerformanceStats> nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap();

private ThreadPool threadPool;
private volatile Scheduler.Cancellable scheduledFuture;

private NodePerformanceTracker nodePerformanceTracker;
private ClusterService clusterService;

public PerfStatsCollectorService(NodePerformanceTracker nodePerformanceTracker, ClusterService clusterService, ThreadPool threadPool) {
public PerformanceCollectorService(
NodePerformanceTracker nodePerformanceTracker,
ClusterService clusterService,
ThreadPool threadPool
) {
this.threadPool = threadPool;
this.nodePerformanceTracker = nodePerformanceTracker;
this.clusterService = clusterService;
Expand All @@ -71,10 +74,10 @@ void removeNodePerfStatistics(String nodeId) {
/**
* Collect node performance statistics along with the timestamp
*/
public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
public void collectNodePerfStatistics(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> {
if (nodePerfStats == null) {
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp);
return new NodePerformanceStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent);
} else {
nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent;
nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent;
Expand All @@ -87,9 +90,9 @@ public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercen
/**
* Get all node statistics which will be used for node stats
*/
public Map<String, NodePerformanceStatistics> getAllNodeStatistics() {
Map<String, NodePerformanceStatistics> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStatistics(nodePerfStats)); });
public Map<String, NodePerformanceStats> getAllNodeStatistics() {
Map<String, NodePerformanceStats> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStats(nodePerfStats)); });
return nodeStats;
}

Expand All @@ -98,27 +101,27 @@ public Map<String, NodePerformanceStatistics> getAllNodeStatistics() {
* performance stats information exists for the given node. Returns an empty
* {@code Optional} if the node was not found.
*/
public Optional<NodePerformanceStatistics> getNodeStatistics(final String nodeId) {
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStatistics(perfStats));
public Optional<NodePerformanceStats> getNodeStatistics(final String nodeId) {
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStats(perfStats));
}

/**
* Returns collected performance statistics of all nodes
*/
public GlobalPerformanceStats stats() {
return new GlobalPerformanceStats(getAllNodeStatistics());
public NodesPerformanceStats stats() {
return new NodesPerformanceStats(getAllNodeStatistics());
}

/**
* Fetch local node performance statistics and add it to store along with the current timestamp
*/
private void getLocalNodePerformanceStats() {
private void collectLocalNodePerformanceStats() {
if (nodePerformanceTracker.isReady() && clusterService.state() != null) {
collectNodePerfStatistics(
clusterService.state().nodes().getLocalNodeId(),
nodePerformanceTracker.getCpuUtilizationPercent(),
System.currentTimeMillis(),
nodePerformanceTracker.getMemoryUtilizationPercent(),
System.currentTimeMillis()
nodePerformanceTracker.getCpuUtilizationPercent()
);
}
}
Expand All @@ -130,9 +133,9 @@ protected void doStart() {
*/
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
getLocalNodePerformanceStats();
collectLocalNodePerformanceStats();
} catch (Exception e) {
logger.warn("failure in PerfStatsCollectorService", e);
logger.warn("failure in PerformanceCollectorService", e);
}
}, new TimeValue(REFRESH_INTERVAL_IN_MILLIS), ThreadPool.Names.GENERIC);
}
Expand All @@ -145,5 +148,5 @@ protected void doStop() {
}

@Override
protected void doClose() throws IOException {}
protected void doClose() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -91,5 +90,5 @@ protected void doStop() {
}

@Override
protected void doClose() throws IOException {}
protected void doClose() {}
}
Loading

0 comments on commit 525ed11

Please sign in to comment.