Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4554: Counter for used nodes within a DAG #362

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,24 @@ public enum DAGCounter {
* Number of container reuses during a DAG. This is incremented every time
* the containerReused callback is called in the TaskSchedulerContext.
*/
TOTAL_CONTAINER_REUSE_COUNT
TOTAL_CONTAINER_REUSE_COUNT,

/*
* Number of nodes to which task attempts were assigned in this DAG.
* Nodes are distinguished by the Yarn NodeId.
*/
NODE_USED_COUNT,

/*
* Number of node hosts to which task attempts were assigned in this DAG.
* Nodes are distinguished by Yarn NodeId.getHost()
*/
NODE_HOSTS_USED_COUNT,

/*
* Total number of nodes visible to the task scheduler (regardless of
* task assignments). This is typically exposed by a resource manager
* client.
*/
NODE_TOTAL_COUNT
}
4 changes: 2 additions & 2 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.counters.DAGCounter;
Expand Down Expand Up @@ -106,7 +106,7 @@ VertexStatusBuilder getVertexStatus(String vertexName,

void incrementDagCounter(DAGCounter counter, int incrValue);
void setDagCounter(DAGCounter counter, int setValue);
void addUsedContainer(ContainerId containerId);
void addUsedContainer(Container container);

/**
* Called by the DAGAppMaster when the DAG is started normally or in the event of recovery.
Expand Down
20 changes: 16 additions & 4 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
Expand Down Expand Up @@ -250,6 +252,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<NodeId> nodesUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<String> nodeHostsUsedByCurrentDAG = new HashSet<>();


protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
Expand Down Expand Up @@ -2563,7 +2570,7 @@ public void onStart() {
@Override
public void onFinish() {
stopVertexServices();
handleUsedContainersOnDagFinish();
updateCounters();
}

private void startVertexServices() {
Expand All @@ -2579,11 +2586,16 @@ void stopVertexServices() {
}

@Override
public void addUsedContainer(ContainerId containerId) {
containersUsedByCurrentDAG.add(containerId);
public void addUsedContainer(Container container) {
containersUsedByCurrentDAG.add(container.getId());
nodesUsedByCurrentDAG.add(container.getNodeId());
nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can container.getNodeId() be null in any scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, should not be, in that case it's a yarn bug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhk

}

private void handleUsedContainersOnDagFinish() {
private void updateCounters() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ public synchronized void taskAllocated(int schedulerId, Object task,
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), schedulerId, container.getId()));
}
appContext.getCurrentDAG().addUsedContainer(containerId);
appContext.getCurrentDAG().addUsedContainer(container);

TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.junit.Rule;
Expand All @@ -60,6 +61,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
Expand Down Expand Up @@ -181,6 +184,7 @@ public class TestDAGImpl {
private ACLManager aclManager;
private ApplicationAttemptId appAttemptId;
private DAGImpl dag;
private TaskSchedulerManager taskSchedulerManager;
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
Expand Down Expand Up @@ -861,11 +865,12 @@ public void setup() {
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
taskSchedulerManager = mock(TaskSchedulerManager.class);
execService = mock(ListeningExecutorService.class);
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
when(appContext.getHadoopShim()).thenReturn(defaultShim);
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());

doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
Expand Down Expand Up @@ -2358,22 +2363,82 @@ public void testCounterLimits() {

}

@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testTotalContainersUsedCounter() {
DAGImpl spy = getDagSpy();

spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"),
mock(NodeId.class), null, null, null, null));
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"),
mock(NodeId.class), null, null, null, null));

spy.onFinish();
// 2 calls to addUsedContainer
verify(spy, times(2)).addUsedContainer(any(Container.class));
// 2 containers were used
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name())
.getValue());
}

@Test(timeout = 5000)
public void testNodesUsedCounter() {
DAGImpl spy = getDagSpy();

Container containerOnHost = mock(Container.class);
when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
Container containerOnSameHost = mock(Container.class);
when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
Container containerOnDifferentHost = mock(Container.class);
when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0"));
Container containerOnSameHostWithDifferentPort = mock(Container.class);
when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1"));

spy.addUsedContainer(containerOnHost);
spy.addUsedContainer(containerOnSameHost);
spy.addUsedContainer(containerOnDifferentHost);
spy.addUsedContainer(containerOnSameHostWithDifferentPort);

when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10);

spy.onFinish();
// 4 calls to addUsedContainer
verify(spy, times(4)).addUsedContainer(any(Container.class));
// 3 nodes were used: localhost:0, otherhost:0, localhost:1
// localhost:0 and localhost:1 might be on the same physical host, but as long as
// yarn considers them different nodes, we consider them different too
Assert.assertEquals(3,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0")));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0")));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1")));

// 2 distinct node hosts were seen: localhost, otherhost
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name())
.getValue());

Assert.assertEquals(10,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost"));
Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost"));
}

private DAGImpl getDagSpy() {
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();

DAGImpl spy = spy(mrrDag);
spy.addUsedContainer(mock(ContainerId.class));
spy.addUsedContainer(mock(ContainerId.class));
// needed when onFinish() method is called on a DAGImpl
when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager);

spy.onFinish();
// 2 calls to addUsedContainer, obviously, we did it here
verify(spy, times(2)).addUsedContainer(any(ContainerId.class));
// 1 call to setDagCounter, which happened at dag.onFinish
verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2);
return spy(mrrDag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testSimpleAllocate() throws Exception {
assertEquals(priority, assignEvent.getPriority());
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());

verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class)); // called on taskAllocated
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class)); // called on taskAllocated
}

@Test(timeout = 5000)
Expand Down
Loading