aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala60
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala73
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala38
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala26
5 files changed, 159 insertions, 40 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b4d9990ce..c6a6d7ac56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -375,7 +375,7 @@ private[spark] class ApplicationMaster(
}
}
try {
- val numPendingAllocate = allocator.getNumPendingAllocate
+ val numPendingAllocate = allocator.getPendingAllocate.size
val sleepInterval =
if (numPendingAllocate > 0) {
val currentAllocationInterval =
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 081780204e..2ec189de7c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -18,9 +18,11 @@
package org.apache.spark.deploy.yarn
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.spark.SparkConf
@@ -30,8 +32,8 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
/**
* This strategy is calculating the optimal locality preferences of YARN containers by considering
* the node ratio of pending tasks, number of required cores/containers and and locality of current
- * existing containers. The target of this algorithm is to maximize the number of tasks that
- * would run locally.
+ * existing and pending allocated containers. The target of this algorithm is to maximize the number
+ * of tasks that would run locally.
*
* Consider a situation in which we have 20 tasks that require (host1, host2, host3)
* and 10 tasks that require (host1, host2, host4), besides each container has 2 cores
@@ -91,6 +93,11 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* @param numLocalityAwareTasks number of locality required tasks
* @param hostToLocalTaskCount a map to store the preferred hostname and possible task
* numbers running on it, used as hints for container allocation
+ * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the
+ * expected locality preference by considering the existing
+ * containers
+ * @param localityMatchedPendingAllocations A sequence of pending container request which
+ * matches the localities of current required tasks.
* @return node localities and rack localities, each locality is an array of string,
* the length of localities is the same as number of containers
*/
@@ -98,10 +105,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
numContainer: Int,
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
- allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+ localityMatchedPendingAllocations: Seq[ContainerRequest]
): Array[ContainerLocalityPreferences] = {
val updatedHostToContainerCount = expectedHostToContainerCount(
- numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap)
+ numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
+ localityMatchedPendingAllocations)
val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
// The number of containers to allocate, divided into two groups, one with preferred locality,
@@ -158,20 +167,28 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* @param localityAwareTasks number of locality aware tasks
* @param hostToLocalTaskCount a map to store the preferred hostname and possible task
* numbers running on it, used as hints for container allocation
+ * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the
+ * expected locality preference by considering the existing
+ * containers
+ * @param localityMatchedPendingAllocations A sequence of pending container request which
+ * matches the localities of current required tasks.
* @return a map with hostname as key and required number of containers on this host as value
*/
private def expectedHostToContainerCount(
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
- allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+ localityMatchedPendingAllocations: Seq[ContainerRequest]
): Map[String, Int] = {
val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+ val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)
+
hostToLocalTaskCount.map { case (host, count) =>
val expectedCount =
count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
- val existedCount = allocatedHostToContainersMap.get(host)
- .map(_.size)
- .getOrElse(0)
+ // Take the locality of pending containers into consideration
+ val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
+ pendingHostToContainersMap.getOrElse(host, 0.0)
// If existing container can not fully satisfy the expected number of container,
// the required container number is expected count minus existed count. Otherwise the
@@ -179,4 +196,31 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
(host, math.max(0, (expectedCount - existedCount).ceil.toInt))
}
}
+
+ /**
+ * According to the locality ratio and number of container requests, calculate the host to
+ * possible number of containers for pending allocated containers.
+ *
+ * If current locality ratio of hosts is: Host1 : Host2 : Host3 = 20 : 20 : 10,
+ * and pending container requests is 3, so the possible number of containers on
+ * Host1 : Host2 : Host3 will be 1.2 : 1.2 : 0.6.
+ * @param localityMatchedPendingAllocations A sequence of pending container request which
+ * matches the localities of current required tasks.
+ * @return a Map with hostname as key and possible number of containers on this host as value
+ */
+ private def pendingHostToContainerCount(
+ localityMatchedPendingAllocations: Seq[ContainerRequest]): Map[String, Double] = {
+ val pendingHostToContainerCount = new HashMap[String, Int]()
+ localityMatchedPendingAllocations.foreach { cr =>
+ cr.getNodes.asScala.foreach { n =>
+ val count = pendingHostToContainerCount.getOrElse(n, 0) + 1
+ pendingHostToContainerCount(n) = count
+ }
+ }
+
+ val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
+ val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
+ pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
+ .toMap
+ }
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 875bbd4e4e..a0cf1b4aa4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -157,15 +157,19 @@ private[yarn] class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed
/**
- * Number of container requests that have not yet been fulfilled.
+ * A sequence of pending container requests that have not yet been fulfilled.
*/
- def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+ def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
/**
- * Number of container requests at the given location that have not yet been fulfilled.
+ * A sequence of pending container requests at the given location that have not yet been
+ * fulfilled.
*/
- private def getNumPendingAtLocation(location: String): Int =
- amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala.map(_.size).sum
+ private def getPendingAtLocation(location: String): Seq[ContainerRequest] = {
+ amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala
+ .flatMap(_.asScala)
+ .toSeq
+ }
/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
@@ -251,20 +255,31 @@ private[yarn] class YarnAllocator(
* Visible for testing.
*/
def updateResourceRequests(): Unit = {
- val numPendingAllocate = getNumPendingAllocate
+ val pendingAllocate = getPendingAllocate
+ val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
- // TODO. Consider locality preferences of pending container requests.
- // Since the last time we made container requests, stages have completed and been submitted,
- // and that the localities at which we requested our pending executors
- // no longer apply to our current needs. We should consider to remove all outstanding
- // container requests and add requests anew each time to avoid this.
if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
+ // Split the pending container request into three groups: locality matched list, locality
+ // unmatched list and non-locality list. Take the locality matched container request into
+ // consideration of container placement, treat as allocated containers.
+ // For locality unmatched and locality free container requests, cancel these container
+ // requests, since required locality preference has been changed, recalculating using
+ // container placement strategy.
+ val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
+ hostToLocalTaskCounts, pendingAllocate)
+
+ // Remove the outdated container request and recalculate the requested container number
+ localityUnMatched.foreach(amClient.removeContainerRequest)
+ localityFree.foreach(amClient.removeContainerRequest)
+ val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
+
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
- missing, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap)
+ updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
+ allocatedHostToContainersMap, localityMatched)
for (locality <- containerLocalityPreferences) {
val request = createContainerRequest(resource, locality.nodes, locality.racks)
@@ -291,7 +306,7 @@ private[yarn] class YarnAllocator(
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
*/
- protected def createContainerRequest(
+ private def createContainerRequest(
resource: Resource,
nodes: Array[String],
racks: Array[String]): ContainerRequest = {
@@ -535,6 +550,38 @@ private[yarn] class YarnAllocator(
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+ /**
+ * Split the pending container requests into 3 groups based on current localities of pending
+ * tasks.
+ * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+ * container placement hint.
+ * @param pendingAllocations A sequence of pending allocation container request.
+ * @return A tuple of 3 sequences, first is a sequence of locality matched container
+ * requests, second is a sequence of locality unmatched container requests, and third is a
+ * sequence of locality free container requests.
+ */
+ private def splitPendingAllocationsByLocality(
+ hostToLocalTaskCount: Map[String, Int],
+ pendingAllocations: Seq[ContainerRequest]
+ ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
+ val localityMatched = ArrayBuffer[ContainerRequest]()
+ val localityUnMatched = ArrayBuffer[ContainerRequest]()
+ val localityFree = ArrayBuffer[ContainerRequest]()
+
+ val preferredHosts = hostToLocalTaskCount.keySet
+ pendingAllocations.foreach { cr =>
+ val nodes = cr.getNodes
+ if (nodes == null) {
+ localityFree += cr
+ } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
+ localityMatched += cr
+ } else {
+ localityUnMatched += cr
+ }
+ }
+
+ (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
+ }
}
private object YarnAllocator {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
index b7fe4ccc67..afb4b691b5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.SparkFunSuite
@@ -26,6 +27,9 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
private val yarnAllocatorSuite = new YarnAllocatorSuite
import yarnAllocatorSuite._
+ def createContainerRequest(nodes: Array[String]): ContainerRequest =
+ new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+
override def beforeEach() {
yarnAllocatorSuite.beforeEach()
}
@@ -44,7 +48,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(
Array("host3", "host4", "host5"),
@@ -66,7 +71,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) ===
Array(null, Array("host2", "host3"), Array("host2", "host3")))
@@ -86,7 +92,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
}
@@ -105,7 +112,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(null, null, null))
}
@@ -118,8 +126,28 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 0, Map.empty, handler.allocatedHostToContainersMap)
+ 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(null))
}
+
+ test("allocate locality preferred containers by considering the localities of pending requests") {
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val pendingAllocationRequests = Seq(
+ createContainerRequest(Array("host2", "host3")),
+ createContainerRequest(Array("host1", "host4")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, pendingAllocationRequests)
+
+ assert(localities.map(_.nodes) === Array(Array("host3")))
+ }
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 5d05f514ad..bd80036c5c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -116,7 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
@@ -134,7 +134,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host1")
@@ -154,7 +154,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (2)
+ handler.getPendingAllocate.size should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -174,11 +174,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (3)
+ handler.getPendingAllocate.size should be (3)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
@@ -189,18 +189,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
}
test("decrease total requested executors to less than currently running") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (3)
+ handler.getPendingAllocate.size should be (3)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -210,7 +210,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (0)
+ handler.getPendingAllocate.size should be (0)
handler.getNumExecutorsRunning should be (2)
}
@@ -218,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -233,14 +233,14 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
}
test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -255,7 +255,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.processCompletedContainers(statuses.toSeq)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (2)
+ handler.getPendingAllocate.size should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}