From 547c8594c866cfb3cabbcb756db7ed8516962f01 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Wed, 19 Jun 2024 15:02:13 +0200 Subject: [PATCH 1/3] TEZ-4554: Counter for used nodes within a DAG --- .../tez/common/counters/DAGCounter.java | 21 ++++- .../java/org/apache/tez/dag/app/dag/DAG.java | 4 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 20 ++++- .../tez/dag/app/rm/TaskSchedulerManager.java | 2 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 85 ++++++++++++++++--- .../dag/app/rm/TestTaskSchedulerManager.java | 2 +- 6 files changed, 115 insertions(+), 19 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index 149bacdf5c..ffc4e39b4e 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -87,5 +87,24 @@ public enum DAGCounter { * Number of container reuses during a DAG. This is incremented every time * the containerReused callback is called in the TaskSchedulerContext. */ - TOTAL_CONTAINER_REUSE_COUNT + TOTAL_CONTAINER_REUSE_COUNT, + + /* + * Number of nodes to which task attempts were assigned in this DAG. + * Nodes are distinguished by the Yarn NodeId. + */ + NODE_USED_COUNT, + + /* + * Number of node hosts to which task attempts were assigned in this DAG. + * Nodes are distinguished by Yarn NodeId.getHost() + */ + NODE_HOSTS_USED_COUNT, + + /* + * Total number of nodes visible to the task scheduler (regardless of + * task assignments). This is typically exposed by a resource manager + * client. + */ + NODE_TOTAL_COUNT } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index ff5afb4099..c828d81b2b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -23,7 +23,7 @@ import java.util.Set; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.counters.DAGCounter; @@ -106,7 +106,7 @@ VertexStatusBuilder getVertexStatus(String vertexName, void incrementDagCounter(DAGCounter counter, int incrValue); void setDagCounter(DAGCounter counter, int setValue); - void addUsedContainer(ContainerId containerId); + void addUsedContainer(Container container); /** * Called by the DAGAppMaster when the DAG is started normally or in the event of recovery. diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index b00cea8b24..98492a3cc8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -61,8 +61,10 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -250,6 +252,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); private final Set containersUsedByCurrentDAG = new HashSet<>(); + @VisibleForTesting + final Set nodesUsedByCurrentDAG = new HashSet<>(); + @VisibleForTesting + final Set nodeHostsUsedByCurrentDAG = new HashSet<>(); + protected static final StateMachineFactory @@ -2563,7 +2570,7 @@ public void onStart() { @Override public void onFinish() { stopVertexServices(); - handleUsedContainersOnDagFinish(); + updateCounters(); } private void startVertexServices() { @@ -2579,11 +2586,16 @@ void stopVertexServices() { } @Override - public void addUsedContainer(ContainerId containerId) { - containersUsedByCurrentDAG.add(containerId); + public void addUsedContainer(Container container) { + containersUsedByCurrentDAG.add(container.getId()); + nodesUsedByCurrentDAG.add(container.getNodeId()); + nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost()); } - private void handleUsedContainersOnDagFinish() { + private void updateCounters() { setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size()); + setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size()); + setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size()); + setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes()); } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 0bf62afbc3..27adbc1fc3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -746,7 +746,7 @@ public synchronized void taskAllocated(int schedulerId, Object task, sendEvent(new AMNodeEventContainerAllocated(container .getNodeId(), schedulerId, container.getId())); } - appContext.getCurrentDAG().addUsedContainer(containerId); + appContext.getCurrentDAG().addUsedContainer(container); TaskAttempt taskAttempt = event.getTaskAttempt(); // TODO - perhaps check if the task still needs this container diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 1e003accc5..8baab5e35b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -49,6 +49,7 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; +import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.hadoop.shim.HadoopShim; import org.junit.Rule; @@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; @@ -181,6 +184,7 @@ public class TestDAGImpl { private ACLManager aclManager; private ApplicationAttemptId appAttemptId; private DAGImpl dag; + private TaskSchedulerManager taskSchedulerManager; private TaskEventDispatcher taskEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private DagEventDispatcher dagEventDispatcher; @@ -861,11 +865,12 @@ public void setup() { dispatcher = new DrainDispatcher(); fsTokens = new Credentials(); appContext = mock(AppContext.class); + taskSchedulerManager = mock(TaskSchedulerManager.class); execService = mock(ListeningExecutorService.class); final ListenableFuture mockFuture = mock(ListenableFuture.class); when(appContext.getHadoopShim()).thenReturn(defaultShim); when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId()); - + doAnswer(new Answer() { public ListenableFuture answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); @@ -2358,22 +2363,82 @@ public void testCounterLimits() { } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testTotalContainersUsedCounter() { + DAGImpl spy = getDagSpy(); + + spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"), + mock(NodeId.class), null, null, null, null)); + spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"), + mock(NodeId.class), null, null, null, null)); + + spy.onFinish(); + // 2 calls to addUsedContainer + verify(spy, times(2)).addUsedContainer(any(Container.class)); + // 2 containers were used + Assert.assertEquals(2, + spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name()) + .getValue()); + } + + @Test(timeout = 5000) + public void testNodesUsedCounter() { + DAGImpl spy = getDagSpy(); + + Container containerOnHost = mock(Container.class); + when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0")); + Container containerOnSameHost = mock(Container.class); + when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0")); + Container containerOnDifferentHost = mock(Container.class); + when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0")); + Container containerOnSameHostWithDifferentPort = mock(Container.class); + when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1")); + + spy.addUsedContainer(containerOnHost); + spy.addUsedContainer(containerOnSameHost); + spy.addUsedContainer(containerOnDifferentHost); + spy.addUsedContainer(containerOnSameHostWithDifferentPort); + + when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10); + + spy.onFinish(); + // 4 calls to addUsedContainer + verify(spy, times(4)).addUsedContainer(any(Container.class)); + // 3 nodes were used: localhost:0, otherhost:0, localhost:1 + // localhost:0 and localhost:1 might be on the same physical host, but as long as + // yarn considers them different nodes, we consider them different too + Assert.assertEquals(3, + spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name()) + .getValue()); + + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0"))); + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0"))); + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1"))); + + // 2 distinct node hosts were seen: localhost, otherhost + Assert.assertEquals(2, + spy.getAllCounters().getGroup(DAGCounter.class.getName()) + .findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name()) + .getValue()); + + Assert.assertEquals(10, + spy.getAllCounters().getGroup(DAGCounter.class.getName()) + .findCounter(DAGCounter.NODE_TOTAL_COUNT.name()) + .getValue()); + + Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost")); + Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost")); + } + + private DAGImpl getDagSpy() { initDAG(mrrDag); dispatcher.await(); startDAG(mrrDag); dispatcher.await(); - DAGImpl spy = spy(mrrDag); - spy.addUsedContainer(mock(ContainerId.class)); - spy.addUsedContainer(mock(ContainerId.class)); + // needed when onFinish() method is called on a DAGImpl + when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager); - spy.onFinish(); - // 2 calls to addUsedContainer, obviously, we did it here - verify(spy, times(2)).addUsedContainer(any(ContainerId.class)); - // 1 call to setDagCounter, which happened at dag.onFinish - verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2); + return spy(mrrDag); } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index e3dd1ac924..901a9df71b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -234,7 +234,7 @@ public void testSimpleAllocate() throws Exception { assertEquals(priority, assignEvent.getPriority()); assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); - verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class)); // called on taskAllocated + verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class)); // called on taskAllocated } @Test(timeout = 5000) From fdd73619ca375451bf33f527f810ac72ebc4ca07 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Fri, 21 Jun 2024 16:47:22 +0200 Subject: [PATCH 2/3] rework Change-Id: Ic72fbe3d490e2729ece74f07b42891d1c191b1c5 --- .../tez/common/counters/DAGCounter.java | 8 +--- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 12 ++---- .../tez/dag/app/rm/TaskSchedulerManager.java | 42 +++++++++++++++---- .../tez/dag/app/dag/impl/TestDAGImpl.java | 22 +++------- 4 files changed, 43 insertions(+), 41 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index ffc4e39b4e..23c1978430 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -91,16 +91,10 @@ public enum DAGCounter { /* * Number of nodes to which task attempts were assigned in this DAG. - * Nodes are distinguished by the Yarn NodeId. + * Nodes are distinguished by the Yarn NodeId.getHost(). */ NODE_USED_COUNT, - /* - * Number of node hosts to which task attempts were assigned in this DAG. - * Nodes are distinguished by Yarn NodeId.getHost() - */ - NODE_HOSTS_USED_COUNT, - /* * Total number of nodes visible to the task scheduler (regardless of * task assignments). This is typically exposed by a resource manager diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 98492a3cc8..3075807417 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -253,10 +252,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); private final Set containersUsedByCurrentDAG = new HashSet<>(); @VisibleForTesting - final Set nodesUsedByCurrentDAG = new HashSet<>(); - @VisibleForTesting - final Set nodeHostsUsedByCurrentDAG = new HashSet<>(); - + final Set nodesUsedByCurrentDAG = new HashSet<>(); protected static final StateMachineFactory @@ -2588,14 +2584,12 @@ void stopVertexServices() { @Override public void addUsedContainer(Container container) { containersUsedByCurrentDAG.add(container.getId()); - nodesUsedByCurrentDAG.add(container.getNodeId()); - nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost()); + nodesUsedByCurrentDAG.add(container.getNodeId().getHost()); } private void updateCounters() { setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size()); setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size()); - setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size()); - setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes()); + setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true)); } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 27adbc1fc3..148429c986 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -224,9 +224,29 @@ public void setSignalled(boolean isSignalled) { } public int getNumClusterNodes() { + return getNumClusterNodes(false); + } + + public int getNumClusterNodes(boolean tryUpdate){ + if (cachedNodeCount == -1 && tryUpdate){ + cachedNodeCount = countAllNodes(); + } return cachedNodeCount; } - + + private int countAllNodes() { + int nodeCount = 0; + int schedulerId = 0; + try { + for (schedulerId = 0; schedulerId < taskSchedulers.length; schedulerId++) { + nodeCount += taskSchedulers[schedulerId].getClusterNodeCount(); + } + } catch (Exception e) { + handleTaskSchedulerException(e, schedulerId); + } + return nodeCount; + } + public Resource getAvailableResources(int schedulerId) { try { return taskSchedulers[schedulerId].getAvailableResources(); @@ -887,14 +907,7 @@ public float getProgress(int schedulerId) { try { nodeCount = taskSchedulers[0].getClusterNodeCount(); } catch (Exception e) { - String msg = "Error in TaskScheduler while getting node count" - + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); - LOG.error(msg, e); - sendEvent( - new DAGAppMasterEventUserServiceFatalError( - DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, - msg, e)); - throw new RuntimeException(e); + handleTaskSchedulerException(e, 0); } if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; @@ -903,6 +916,17 @@ public float getProgress(int schedulerId) { return dagAppMaster.getProgress(); } + private void handleTaskSchedulerException(Exception e, int schedulerId) { + String msg = "Error in TaskScheduler while getting node count" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + throw new RuntimeException(e); + } + public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagInfo) { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 8baab5e35b..46c4fe1cff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -2399,35 +2400,24 @@ public void testNodesUsedCounter() { spy.addUsedContainer(containerOnDifferentHost); spy.addUsedContainer(containerOnSameHostWithDifferentPort); - when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10); + when(taskSchedulerManager.getNumClusterNodes(anyBoolean())).thenReturn(10); spy.onFinish(); // 4 calls to addUsedContainer verify(spy, times(4)).addUsedContainer(any(Container.class)); - // 3 nodes were used: localhost:0, otherhost:0, localhost:1 - // localhost:0 and localhost:1 might be on the same physical host, but as long as - // yarn considers them different nodes, we consider them different too - Assert.assertEquals(3, - spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name()) - .getValue()); - - Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0"))); - Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0"))); - Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1"))); // 2 distinct node hosts were seen: localhost, otherhost Assert.assertEquals(2, - spy.getAllCounters().getGroup(DAGCounter.class.getName()) - .findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name()) + spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name()) .getValue()); + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("localhost")); + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("otherhost")); + Assert.assertEquals(10, spy.getAllCounters().getGroup(DAGCounter.class.getName()) .findCounter(DAGCounter.NODE_TOTAL_COUNT.name()) .getValue()); - - Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost")); - Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost")); } private DAGImpl getDagSpy() { From 6a73672026faea45f018f40908a811bff98dea09 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Mon, 24 Jun 2024 21:59:34 +0200 Subject: [PATCH 3/3] use taskSchedulers[0] in countAllNodes --- .../tez/dag/app/rm/TaskSchedulerManager.java | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 148429c986..e311c23e86 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -235,16 +235,11 @@ public int getNumClusterNodes(boolean tryUpdate){ } private int countAllNodes() { - int nodeCount = 0; - int schedulerId = 0; try { - for (schedulerId = 0; schedulerId < taskSchedulers.length; schedulerId++) { - nodeCount += taskSchedulers[schedulerId].getClusterNodeCount(); - } + return taskSchedulers[0].getClusterNodeCount(); } catch (Exception e) { - handleTaskSchedulerException(e, schedulerId); + return handleTaskSchedulerExceptionWhileGettingNodeCount(e); } - return nodeCount; } public Resource getAvailableResources(int schedulerId) { @@ -903,12 +898,7 @@ public float getProgress(int schedulerId) { // Doubles as a mechanism to update node counts periodically. Hence schedulerId required. // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in. - int nodeCount = 0; - try { - nodeCount = taskSchedulers[0].getClusterNodeCount(); - } catch (Exception e) { - handleTaskSchedulerException(e, 0); - } + int nodeCount = countAllNodes(); if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId)); @@ -916,9 +906,9 @@ public float getProgress(int schedulerId) { return dagAppMaster.getProgress(); } - private void handleTaskSchedulerException(Exception e, int schedulerId) { + private int handleTaskSchedulerExceptionWhileGettingNodeCount(Exception e) { String msg = "Error in TaskScheduler while getting node count" - + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(0, appContext); LOG.error(msg, e); sendEvent( new DAGAppMasterEventUserServiceFatalError(