From bb499355bb89ee81f99340216430b10bae901798 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Fri, 21 Jun 2024 16:47:22 +0200 Subject: [PATCH] rework Change-Id: Ic72fbe3d490e2729ece74f07b42891d1c191b1c5 --- .../tez/common/counters/DAGCounter.java | 8 +--- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 11 ++--- .../tez/dag/app/rm/TaskSchedulerManager.java | 42 +++++++++++++++---- .../tez/dag/app/dag/impl/TestDAGImpl.java | 22 +++------- 4 files changed, 43 insertions(+), 40 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index ffc4e39b4e..23c1978430 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -91,16 +91,10 @@ public enum DAGCounter { /* * Number of nodes to which task attempts were assigned in this DAG. - * Nodes are distinguished by the Yarn NodeId. + * Nodes are distinguished by the Yarn NodeId.getHost(). */ NODE_USED_COUNT, - /* - * Number of node hosts to which task attempts were assigned in this DAG. - * Nodes are distinguished by Yarn NodeId.getHost() - */ - NODE_HOSTS_USED_COUNT, - /* * Total number of nodes visible to the task scheduler (regardless of * task assignments). This is typically exposed by a resource manager diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 98492a3cc8..a5e040f9cd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -253,10 +253,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); private final Set containersUsedByCurrentDAG = new HashSet<>(); @VisibleForTesting - final Set nodesUsedByCurrentDAG = new HashSet<>(); - @VisibleForTesting - final Set nodeHostsUsedByCurrentDAG = new HashSet<>(); - + final Set nodesUsedByCurrentDAG = new HashSet<>(); protected static final StateMachineFactory @@ -2588,14 +2585,12 @@ void stopVertexServices() { @Override public void addUsedContainer(Container container) { containersUsedByCurrentDAG.add(container.getId()); - nodesUsedByCurrentDAG.add(container.getNodeId()); - nodeHostsUsedByCurrentDAG.add(container.getNodeId().getHost()); + nodesUsedByCurrentDAG.add(container.getNodeId().getHost()); } private void updateCounters() { setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size()); setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size()); - setDagCounter(DAGCounter.NODE_HOSTS_USED_COUNT, nodeHostsUsedByCurrentDAG.size()); - setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes()); + setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true)); } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 27adbc1fc3..148429c986 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -224,9 +224,29 @@ public void setSignalled(boolean isSignalled) { } public int getNumClusterNodes() { + return getNumClusterNodes(false); + } + + public int getNumClusterNodes(boolean tryUpdate){ + if (cachedNodeCount == -1 && tryUpdate){ + cachedNodeCount = countAllNodes(); + } return cachedNodeCount; } - + + private int countAllNodes() { + int nodeCount = 0; + int schedulerId = 0; + try { + for (schedulerId = 0; schedulerId < taskSchedulers.length; schedulerId++) { + nodeCount += taskSchedulers[schedulerId].getClusterNodeCount(); + } + } catch (Exception e) { + handleTaskSchedulerException(e, schedulerId); + } + return nodeCount; + } + public Resource getAvailableResources(int schedulerId) { try { return taskSchedulers[schedulerId].getAvailableResources(); @@ -887,14 +907,7 @@ public float getProgress(int schedulerId) { try { nodeCount = taskSchedulers[0].getClusterNodeCount(); } catch (Exception e) { - String msg = "Error in TaskScheduler while getting node count" - + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); - LOG.error(msg, e); - sendEvent( - new DAGAppMasterEventUserServiceFatalError( - DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, - msg, e)); - throw new RuntimeException(e); + handleTaskSchedulerException(e, 0); } if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; @@ -903,6 +916,17 @@ public float getProgress(int schedulerId) { return dagAppMaster.getProgress(); } + private void handleTaskSchedulerException(Exception e, int schedulerId) { + String msg = "Error in TaskScheduler while getting node count" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + throw new RuntimeException(e); + } + public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagInfo) { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 8baab5e35b..46c4fe1cff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -2399,35 +2400,24 @@ public void testNodesUsedCounter() { spy.addUsedContainer(containerOnDifferentHost); spy.addUsedContainer(containerOnSameHostWithDifferentPort); - when(taskSchedulerManager.getNumClusterNodes()).thenReturn(10); + when(taskSchedulerManager.getNumClusterNodes(anyBoolean())).thenReturn(10); spy.onFinish(); // 4 calls to addUsedContainer verify(spy, times(4)).addUsedContainer(any(Container.class)); - // 3 nodes were used: localhost:0, otherhost:0, localhost:1 - // localhost:0 and localhost:1 might be on the same physical host, but as long as - // yarn considers them different nodes, we consider them different too - Assert.assertEquals(3, - spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name()) - .getValue()); - - Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:0"))); - Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("otherhost:0"))); - Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains(NodeId.fromString("localhost:1"))); // 2 distinct node hosts were seen: localhost, otherhost Assert.assertEquals(2, - spy.getAllCounters().getGroup(DAGCounter.class.getName()) - .findCounter(DAGCounter.NODE_HOSTS_USED_COUNT.name()) + spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name()) .getValue()); + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("localhost")); + Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("otherhost")); + Assert.assertEquals(10, spy.getAllCounters().getGroup(DAGCounter.class.getName()) .findCounter(DAGCounter.NODE_TOTAL_COUNT.name()) .getValue()); - - Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("localhost")); - Assert.assertTrue(spy.nodeHostsUsedByCurrentDAG.contains("otherhost")); } private DAGImpl getDagSpy() {