Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Changed master total PendingQueueSize to PendingQueueSize per task ty…
Browse files Browse the repository at this point in the history
…pe (#296)

* Changed master total PendingQueueSize to PendingQueueSize per task type

* Addressed comments

Co-authored-by: Meet Shah <[email protected]>
  • Loading branch information
meetshah777 and meetshah777 authored Mar 31, 2021
1 parent bf6a4cc commit ea643fe
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.MasterPendingValue;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.MasterPendingTaskDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.elasticsearch.cluster.service.PendingClusterTask;
import java.util.HashMap;
import java.util.List;


@SuppressWarnings("unchecked")
public class MasterServiceMetrics extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
Expand Down Expand Up @@ -58,12 +63,32 @@ public void collectMetrics(long startTime) {
return;
}

/*
pendingTasks API returns object of PendingClusterTask which contains insertOrder, priority, source, timeInQueue.
Example :
insertOrder: 101,
priority: "URGENT",
source: "create-index [foo_9], cause [api]",
timeIn_queue: "86ms"
*/

List<PendingClusterTask> pendingTasks = ESResources.INSTANCE.getClusterService().getMasterService()
.pendingTasks();
HashMap<String,Integer> pendingTaskCountPerTaskType = new HashMap<>();

pendingTasks.stream().forEach( pendingTask -> {
String pendingTaskType = pendingTask.getSource().toString().split(" ",2)[0];
pendingTaskCountPerTaskType.put(pendingTaskType,
pendingTaskCountPerTaskType.getOrDefault(pendingTaskType,0)+1);
});

value.setLength(0);
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds())
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);
value.append(new MasterPendingStatus(
ESResources.INSTANCE.getClusterService().getMasterService()
.numberOfPendingTasks()).serialize());
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
pendingTaskCountPerTaskType.forEach((pendingTaskType,PendingTaskValue) -> {
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);
value.append(new MasterPendingStatus(pendingTaskType,PendingTaskValue).serialize());
});

saveMetricValues(value.toString(), startTime,
PerformanceAnalyzerMetrics.MASTER_CURRENT, PerformanceAnalyzerMetrics.MASTER_META_DATA);

Expand All @@ -73,11 +98,19 @@ public void collectMetrics(long startTime) {
}

public static class MasterPendingStatus extends MetricStatus {
private final String pendingTaskType;
private final int pendingTasksCount;
public MasterPendingStatus(int pendingTasksCount) {

public MasterPendingStatus(String pendingTaskType, int pendingTasksCount) {
this.pendingTaskType = pendingTaskType;
this.pendingTasksCount = pendingTasksCount;
}

@JsonProperty(MasterPendingTaskDimension.Constants.PENDING_TASK_TYPE)
public String getMasterTaskType(){
return pendingTaskType;
}

@JsonProperty(MasterPendingValue.Constants.PENDING_TASKS_COUNT_VALUE)
public int getPendingTasksCount() {
return pendingTasksCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

Expand Down Expand Up @@ -89,7 +86,7 @@ public void testGetMetricsPath() {
public void testCollectMetrics() {
masterServiceMetrics.collectMetrics(startTimeInMills);
String jsonStr = readMetricsInJsonString(1);
assertTrue(jsonStr.contains(MasterPendingValue.Constants.PENDING_TASKS_COUNT_VALUE));
assertFalse(jsonStr.contains(MasterPendingValue.Constants.PENDING_TASKS_COUNT_VALUE));
}

@Test
Expand All @@ -116,8 +113,8 @@ private String readMetricsInJsonString(int size) {
assert metrics.size() == size;
if (size != 0) {
String[] jsonStrs = metrics.get(0).value.split("\n");
assert jsonStrs.length == 2;
return jsonStrs[1];
assert jsonStrs.length == 1;
return jsonStrs[0];
} else {
return null;
}
Expand Down

0 comments on commit ea643fe

Please sign in to comment.