Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
Change-Id: Ic72fbe3d490e2729ece74f07b42891d1c191b1c5
  • Loading branch information
abstractdog committed Jun 21, 2024
1 parent 547c859 commit bb49935
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,10 @@ public enum DAGCounter {

/*
* Number of nodes to which task attempts were assigned in this DAG.
* Nodes are distinguished by the Yarn NodeId.
* Nodes are distinguished by the Yarn NodeId.getHost().
*/
NODE_USED_COUNT,

/*
* Number of node hosts to which task attempts were assigned in this DAG.
* Nodes are distinguished by Yarn NodeId.getHost()
*/
NODE_HOSTS_USED_COUNT,

/*
* Total number of nodes visible to the task scheduler (regardless of
* task assignments). This is typically exposed by a resource manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<NodeId> nodesUsedByCurrentDAG = new HashSet<>();
@VisibleForTesting
final Set<String> nodeHostsUsedByCurrentDAG = new HashSet<>();

final Set<String> nodesUsedByCurrentDAG = new HashSet<>();

protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
Expand Down Expand Up @@ -2588,14 +2585,12 @@ void stopVertexServices() {
@Override
public void addUsedContainer(Container container) {
containersUsedByCurrentDAG.add(container.getId());
nodesUsedByCurrentDAG.add(container.getNodeId());
nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost());
nodesUsedByCurrentDAG.add(container.getNodeId().getHost());
}

private void updateCounters() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes());
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,29 @@ public void setSignalled(boolean isSignalled) {
}

public int getNumClusterNodes() {
return getNumClusterNodes(false);
}

public int getNumClusterNodes(boolean tryUpdate){
if (cachedNodeCount == -1 && tryUpdate){
cachedNodeCount = countAllNodes();
}
return cachedNodeCount;
}


private int countAllNodes() {
int nodeCount = 0;
int schedulerId = 0;
try {
for (schedulerId = 0; schedulerId < taskSchedulers.length; schedulerId++) {
nodeCount += taskSchedulers[schedulerId].getClusterNodeCount();
}
} catch (Exception e) {
handleTaskSchedulerException(e, schedulerId);
}
return nodeCount;
}

public Resource getAvailableResources(int schedulerId) {
try {
return taskSchedulers[schedulerId].getAvailableResources();
Expand Down Expand Up @@ -887,14 +907,7 @@ public float getProgress(int schedulerId) {
try {
nodeCount = taskSchedulers[0].getClusterNodeCount();
} catch (Exception e) {
String msg = "Error in TaskScheduler while getting node count"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
handleTaskSchedulerException(e, 0);
}
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
Expand All @@ -903,6 +916,17 @@ public float getProgress(int schedulerId) {
return dagAppMaster.getProgress();
}

private void handleTaskSchedulerException(Exception e, int schedulerId) {
String msg = "Error in TaskScheduler while getting node count"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
}

public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError,
String diagnostics,
DagInfo dagInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.impl;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -2399,35 +2400,24 @@ public void testNodesUsedCounter() {
spy.addUsedContainer(containerOnDifferentHost);
spy.addUsedContainer(containerOnSameHostWithDifferentPort);

when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10);
when(taskSchedulerManager.getNumClusterNodes(anyBoolean())).thenReturn(10);

spy.onFinish();
// 4 calls to addUsedContainer
verify(spy, times(4)).addUsedContainer(any(Container.class));
// 3 nodes were used: localhost:0, otherhost:0, localhost:1
// localhost:0 and localhost:1 might be on the same physical host, but as long as
// yarn considers them different nodes, we consider them different too
Assert.assertEquals(3,
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0")));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0")));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1")));

// 2 distinct node hosts were seen: localhost, otherhost
Assert.assertEquals(2,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name())
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("localhost"));
Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("otherhost"));

Assert.assertEquals(10,
spy.getAllCounters().getGroup(DAGCounter.class.getName())
.findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
.getValue());

Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost"));
Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost"));
}

private DAGImpl getDagSpy() {
Expand Down

0 comments on commit bb49935

Please sign in to comment.