diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 317f6ec80e59b..517c85d73a59f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -178,11 +178,22 @@ private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes int missedNonPartitionedRequestSchedulingOpportunity = 0; + SchedulingPlacementSet schedulingPS = + appInfo.getSchedulingPlacementSet(schedulerKey); + if (null == schedulingPS){ + // This is possible when #pending resource decreased by a different + // thread. + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST); + return ContainerAllocation.PRIORITY_SKIPPED; + } + String requestPartition = + schedulingPS.getPrimaryRequestedNodePartition(); + // Only do this when request associated with given scheduler key accepts // NO_LABEL under RESPECT_EXCLUSIVITY mode - if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, - appInfo.getSchedulingPlacementSet(schedulerKey) - .getPrimaryRequestedNodePartition())) { + if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, requestPartition)) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( schedulerKey); @@ -260,12 +271,9 @@ ContainerAllocation tryAllocateOnNode(Resource clusterResource, return result; } - public float getLocalityWaitFactor( - SchedulerRequestKey schedulerKey, int clusterNodes) { + public float getLocalityWaitFactor(int uniqAsks, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = Math.max( - application.getSchedulingPlacementSet(schedulerKey) - .getUniqueLocationAsks() - 1, 0); + int requiredResources = Math.max(uniqAsks - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -295,10 +303,16 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, if (rmContext.getScheduler().getNumClusterNodes() == 0) { return false; } + + int uniqLocationAsks = 0; + SchedulingPlacementSet schedulingPS = + appInfo.getSchedulingPlacementSet(schedulerKey); + if (schedulingPS != null) { + uniqLocationAsks = schedulingPS.getUniqueLocationAsks(); + } // If we have only ANY requests for this schedulerKey, we should not // delay its scheduling. - if (application.getSchedulingPlacementSet(schedulerKey) - .getUniqueLocationAsks() == 1) { + if (uniqLocationAsks == 1) { return true; } @@ -312,7 +326,7 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, } else { long requiredContainers = application.getOutstandingAsksCount( schedulerKey); - float localityWaitFactor = getLocalityWaitFactor(schedulerKey, + float localityWaitFactor = getLocalityWaitFactor(uniqLocationAsks, rmContext.getScheduler().getNumClusterNodes()); // Cap the delay by the number of nodes in the cluster. return (Math.min(rmContext.getScheduler().getNumClusterNodes(), @@ -809,6 +823,12 @@ private ContainerAllocation allocate(Resource clusterResource, application.getAppSchedulingInfo().getSchedulingPlacementSet( schedulerKey); + // This could be null when #pending request decreased by another thread. + if (schedulingPS == null) { + return new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + } + result = ContainerAllocation.PRIORITY_SKIPPED; Iterator iter = schedulingPS.getPreferredNodeIterator(