Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4554: Counter for used nodes within a DAG #362

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,18 @@ public enum DAGCounter {
* Number of container reuses during a DAG. This is incremented every time
* the containerReused callback is called in the TaskSchedulerContext.
*/
TOTAL_CONTAINER_REUSE_COUNT
TOTAL_CONTAINER_REUSE_COUNT,

/*
* Number of nodes to which task attempts were assigned in this DAG.
* Nodes are distinguished by the Yarn NodeId.getHost().
*/
NODE_USED_COUNT,

/*
* Total number of nodes visible to the task scheduler (regardless of
* task assignments). This is typically exposed by a resource manager
* client.
*/
NODE_TOTAL_COUNT
}
4 changes: 2 additions & 2 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.counters.DAGCounter;
Expand Down Expand Up @@ -106,7 +106,7 @@ VertexStatusBuilder getVertexStatus(String vertexName,

void incrementDagCounter(DAGCounter counter, int incrValue);
void setDagCounter(DAGCounter counter, int setValue);
void addUsedContainer(ContainerId containerId);
void addUsedContainer(Container container);

/**
* Called by the DAGAppMaster when the DAG is started normally or in the event of recovery.
Expand Down
14 changes: 10 additions & 4 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
Expand Down Expand Up @@ -250,6 +251,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<String> nodesUsedByCurrentDAG = new HashSet<>();

protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
Expand Down Expand Up @@ -2563,7 +2566,7 @@ public void onStart() {
@Override
public void onFinish() {
stopVertexServices();
handleUsedContainersOnDagFinish();
updateCounters();
}

private void startVertexServices() {
Expand All @@ -2579,11 +2582,14 @@ void stopVertexServices() {
}

@Override
public void addUsedContainer(ContainerId containerId) {
containersUsedByCurrentDAG.add(containerId);
public void addUsedContainer(Container container) {
containersUsedByCurrentDAG.add(container.getId());
nodesUsedByCurrentDAG.add(container.getNodeId().getHost());
}

private void handleUsedContainersOnDagFinish() {
private void updateCounters() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,24 @@ public void setSignalled(boolean isSignalled) {
}

public int getNumClusterNodes() {
return getNumClusterNodes(false);
}

public int getNumClusterNodes(boolean tryUpdate){
if (cachedNodeCount == -1 && tryUpdate){
cachedNodeCount = countAllNodes();
}
return cachedNodeCount;
}


private int countAllNodes() {
try {
return taskSchedulers[0].getClusterNodeCount();
} catch (Exception e) {
return handleTaskSchedulerExceptionWhileGettingNodeCount(e);
}
}

public Resource getAvailableResources(int schedulerId) {
try {
return taskSchedulers[schedulerId].getAvailableResources();
Expand Down Expand Up @@ -746,7 +761,7 @@ public synchronized void taskAllocated(int schedulerId, Object task,
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), schedulerId, container.getId()));
}
appContext.getCurrentDAG().addUsedContainer(containerId);
appContext.getCurrentDAG().addUsedContainer(container);

TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
Expand Down Expand Up @@ -883,26 +898,25 @@ public float getProgress(int schedulerId) {
// Doubles as a mechanism to update node counts periodically. Hence schedulerId required.

// TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
int nodeCount = 0;
try {
nodeCount = taskSchedulers[0].getClusterNodeCount();
} catch (Exception e) {
String msg = "Error in TaskScheduler while getting node count"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
}
int nodeCount = countAllNodes();
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
}
return dagAppMaster.getProgress();
}

private int handleTaskSchedulerExceptionWhileGettingNodeCount(Exception e) {
String msg = "Error in TaskScheduler while getting node count"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(0, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
}

public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError,
String diagnostics,
DagInfo dagInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.impl;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.junit.Rule;
Expand All @@ -60,6 +62,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
Expand Down Expand Up @@ -181,6 +185,7 @@ public class TestDAGImpl {
private ACLManager aclManager;
private ApplicationAttemptId appAttemptId;
private DAGImpl dag;
private TaskSchedulerManager taskSchedulerManager;
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
Expand Down Expand Up @@ -861,11 +866,12 @@ public void setup() {
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
taskSchedulerManager = mock(TaskSchedulerManager.class);
execService = mock(ListeningExecutorService.class);
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
when(appContext.getHadoopShim()).thenReturn(defaultShim);
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());

doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
Expand Down Expand Up @@ -2358,22 +2364,71 @@ public void testCounterLimits() {

}

@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testTotalContainersUsedCounter() {
DAGImpl spy = getDagSpy();

spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"),
mock(NodeId.class), null, null, null, null));
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"),
mock(NodeId.class), null, null, null, null));

spy.onFinish();
// 2 calls to addUsedContainer
verify(spy, times(2)).addUsedContainer(any(Container.class));
// 2 containers were used
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name())
.getValue());
}

@Test(timeout = 5000)
public void testNodesUsedCounter() {
DAGImpl spy = getDagSpy();

Container containerOnHost = mock(Container.class);
when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
Container containerOnSameHost = mock(Container.class);
when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
Container containerOnDifferentHost = mock(Container.class);
when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0"));
Container containerOnSameHostWithDifferentPort = mock(Container.class);
when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1"));

spy.addUsedContainer(containerOnHost);
spy.addUsedContainer(containerOnSameHost);
spy.addUsedContainer(containerOnDifferentHost);
spy.addUsedContainer(containerOnSameHostWithDifferentPort);

when(taskSchedulerManager.getNumClusterNodes(anyBoolean())).thenReturn(10);

spy.onFinish();
// 4 calls to addUsedContainer
verify(spy, times(4)).addUsedContainer(any(Container.class));

// 2 distinct node hosts were seen: localhost, otherhost
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("localhost"));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("otherhost"));

Assert.assertEquals(10,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
.getValue());
}

private DAGImpl getDagSpy() {
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();

DAGImpl spy = spy(mrrDag);
spy.addUsedContainer(mock(ContainerId.class));
spy.addUsedContainer(mock(ContainerId.class));
// needed when onFinish() method is called on a DAGImpl
when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager);

spy.onFinish();
// 2 calls to addUsedContainer, obviously, we did it here
verify(spy, times(2)).addUsedContainer(any(ContainerId.class));
// 1 call to setDagCounter, which happened at dag.onFinish
verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2);
return spy(mrrDag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testSimpleAllocate() throws Exception {
assertEquals(priority, assignEvent.getPriority());
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());

verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class)); // called on taskAllocated
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class)); // called on taskAllocated
}

@Test(timeout = 5000)
Expand Down
Loading