aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala58
1 files changed, 46 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a96cb4957b..e34cd8d1b7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -265,25 +265,52 @@ private[yarn] class YarnAllocator(
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
- val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
+ val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
- // Remove the outdated container request and recalculate the requested container number
- localityUnMatched.foreach(amClient.removeContainerRequest)
- localityFree.foreach(amClient.removeContainerRequest)
- val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
+ // cancel "stale" requests for locations that are no longer needed
+ staleRequests.foreach { stale =>
+ amClient.removeContainerRequest(stale)
+ }
+ val cancelledContainers = staleRequests.size
+ logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
+
+ // consider the number of new containers and cancelled stale containers available
+ val availableContainers = missing + cancelledContainers
+
+ // to maximize locality, include requests with no locality preference that can be cancelled
+ val potentialContainers = availableContainers + anyHostRequests.size
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
- updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
- allocatedHostToContainersMap, localityMatched)
+ potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
+ allocatedHostToContainersMap, localRequests)
+
+ val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+ containerLocalityPreferences.foreach {
+ case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+ newLocalityRequests.append(createContainerRequest(resource, nodes, racks))
+ case _ =>
+ }
- for (locality <- containerLocalityPreferences) {
- val request = createContainerRequest(resource, locality.nodes, locality.racks)
+ if (availableContainers >= newLocalityRequests.size) {
+ // more containers are available than needed for locality, fill in requests for any host
+ for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
+ newLocalityRequests.append(createContainerRequest(resource, null, null))
+ }
+ } else {
+ val numToCancel = newLocalityRequests.size - availableContainers
+ // cancel some requests without locality preferences to schedule more local containers
+ anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
+ amClient.removeContainerRequest(nonLocal)
+ }
+ logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
+ }
+
+ newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
- logInfo(s"Container request (host: $hostStr, capability: $resource)")
+ logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
}
+
} else if (missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor containers")
@@ -298,6 +325,13 @@ private[yarn] class YarnAllocator(
}
}
+ private def hostStr(request: ContainerRequest): String = {
+ Option(request.getNodes) match {
+ case Some(nodes) => nodes.asScala.mkString(",")
+ case None => "Any"
+ }
+ }
+
/**
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.