diff options
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 58 |
1 files changed, 46 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index a96cb4957b..e34cd8d1b7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -265,25 +265,52 @@ private[yarn] class YarnAllocator( // For locality unmatched and locality free container requests, cancel these container // requests, since required locality preference has been changed, recalculating using // container placement strategy. - val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality( + val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( hostToLocalTaskCounts, pendingAllocate) - // Remove the outdated container request and recalculate the requested container number - localityUnMatched.foreach(amClient.removeContainerRequest) - localityFree.foreach(amClient.removeContainerRequest) - val updatedNumContainer = missing + localityUnMatched.size + localityFree.size + // cancel "stale" requests for locations that are no longer needed + staleRequests.foreach { stale => + amClient.removeContainerRequest(stale) + } + val cancelledContainers = staleRequests.size + logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)") + + // consider the number of new containers and cancelled stale containers available + val availableContainers = missing + cancelledContainers + + // to maximize locality, include requests with no locality preference that can be cancelled + val potentialContainers = availableContainers + anyHostRequests.size val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( - updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts, - allocatedHostToContainersMap, localityMatched) + potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts, + allocatedHostToContainersMap, localRequests) + + val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] + containerLocalityPreferences.foreach { + case ContainerLocalityPreferences(nodes, racks) if nodes != null => + newLocalityRequests.append(createContainerRequest(resource, nodes, racks)) + case _ => + } - for (locality <- containerLocalityPreferences) { - val request = createContainerRequest(resource, locality.nodes, locality.racks) + if (availableContainers >= newLocalityRequests.size) { + // more containers are available than needed for locality, fill in requests for any host + for (i <- 0 until (availableContainers - newLocalityRequests.size)) { + newLocalityRequests.append(createContainerRequest(resource, null, null)) + } + } else { + val numToCancel = newLocalityRequests.size - availableContainers + // cancel some requests without locality preferences to schedule more local containers + anyHostRequests.slice(0, numToCancel).foreach { nonLocal => + amClient.removeContainerRequest(nonLocal) + } + logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality") + } + + newLocalityRequests.foreach { request => amClient.addContainerRequest(request) - val nodes = request.getNodes - val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last - logInfo(s"Container request (host: $hostStr, capability: $resource)") + logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)") } + } else if (missing < 0) { val numToCancel = math.min(numPendingAllocate, -missing) logInfo(s"Canceling requests for $numToCancel executor containers") @@ -298,6 +325,13 @@ private[yarn] class YarnAllocator( } } + private def hostStr(request: ContainerRequest): String = { + Option(request.getNodes) match { + case Some(nodes) => nodes.asScala.mkString(",") + case None => "Any" + } + } + /** * Creates a container request, handling the reflection required to use YARN features that were * added in recent versions. |