aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-03-14 11:18:32 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-03-14 11:18:37 -0700
commit63f642aea31fe0d202ce585681d51e7ac1715ba7 (patch)
tree92ec9f8cdcc3cca1db3aa6dab9f0764d2a8081b0 /yarn
parent45f8053be5c635b50c7b4ef5a0dc75d30f411291 (diff)
downloadspark-63f642aea31fe0d202ce585681d51e7ac1715ba7.tar.gz
spark-63f642aea31fe0d202ce585681d51e7ac1715ba7.tar.bz2
spark-63f642aea31fe0d202ce585681d51e7ac1715ba7.zip
[SPARK-13779][YARN] Avoid cancelling non-local container requests.
To maximize locality, the YarnAllocator would cancel any requests with a stale locality preference or no locality preference. This assumed that the majority of tasks had locality preferences, but may not be the case when scanning S3. This caused container requests for S3 tasks to be constantly cancelled and resubmitted. This changes the allocator's logic to cancel only stale requests and just enough requests without locality preferences to submit requests with locality preferences. This avoids cancelling requests without locality preferences that would be resubmitted without locality preferences. We've deployed this patch on our clusters and verified that jobs that couldn't get executors because they kept canceling and resubmitting requests are fixed. Large jobs are running fine. Author: Ryan Blue <blue@apache.org> Closes #11612 from rdblue/SPARK-13779-fix-yarn-allocator-requests.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala58
1 files changed, 46 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a96cb4957b..e34cd8d1b7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -265,25 +265,52 @@ private[yarn] class YarnAllocator(
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
- val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
+ val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
- // Remove the outdated container request and recalculate the requested container number
- localityUnMatched.foreach(amClient.removeContainerRequest)
- localityFree.foreach(amClient.removeContainerRequest)
- val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
+ // cancel "stale" requests for locations that are no longer needed
+ staleRequests.foreach { stale =>
+ amClient.removeContainerRequest(stale)
+ }
+ val cancelledContainers = staleRequests.size
+ logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
+
+ // consider the number of new containers and cancelled stale containers available
+ val availableContainers = missing + cancelledContainers
+
+ // to maximize locality, include requests with no locality preference that can be cancelled
+ val potentialContainers = availableContainers + anyHostRequests.size
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
- updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
- allocatedHostToContainersMap, localityMatched)
+ potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
+ allocatedHostToContainersMap, localRequests)
+
+ val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+ containerLocalityPreferences.foreach {
+ case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+ newLocalityRequests.append(createContainerRequest(resource, nodes, racks))
+ case _ =>
+ }
- for (locality <- containerLocalityPreferences) {
- val request = createContainerRequest(resource, locality.nodes, locality.racks)
+ if (availableContainers >= newLocalityRequests.size) {
+ // more containers are available than needed for locality, fill in requests for any host
+ for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
+ newLocalityRequests.append(createContainerRequest(resource, null, null))
+ }
+ } else {
+ val numToCancel = newLocalityRequests.size - availableContainers
+ // cancel some requests without locality preferences to schedule more local containers
+ anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
+ amClient.removeContainerRequest(nonLocal)
+ }
+ logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
+ }
+
+ newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
- logInfo(s"Container request (host: $hostStr, capability: $resource)")
+ logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
}
+
} else if (missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor containers")
@@ -298,6 +325,13 @@ private[yarn] class YarnAllocator(
}
}
+ private def hostStr(request: ContainerRequest): String = {
+ Option(request.getNodes) match {
+ case Some(nodes) => nodes.asScala.mkString(",")
+ case None => "Any"
+ }
+ }
+
/**
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.