aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-02 10:23:30 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-02 10:23:30 -0800
commita930e624eb9feb0f7d37d99dcb8178feb9c0f177 (patch)
treea514c4db4a1695da3c5fb4dab1e3c98a9798a5e5 /yarn/src/test/scala
parent74ba95228d71a6dc4e95fef19f41dabe7c363d9e (diff)
downloadspark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.tar.gz
spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.tar.bz2
spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.zip
[SPARK-9817][YARN] Improve the locality calculation of containers by taking pending container requests into consideraion
This is a follow-up PR to further improve the locality calculation by considering the pending container's request. Since the locality preferences of tasks may be shifted from time to time, current localities of pending container requests may not fully match the new preferences, this PR improve it by removing outdated, unmatched container requests and replace with new requests. sryza please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #8100 from jerryshao/SPARK-9817.
Diffstat (limited to 'yarn/src/test/scala')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala38
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala26
2 files changed, 46 insertions, 18 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
index b7fe4ccc67..afb4b691b5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.SparkFunSuite
@@ -26,6 +27,9 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
private val yarnAllocatorSuite = new YarnAllocatorSuite
import yarnAllocatorSuite._
+ def createContainerRequest(nodes: Array[String]): ContainerRequest =
+ new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+
override def beforeEach() {
yarnAllocatorSuite.beforeEach()
}
@@ -44,7 +48,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(
Array("host3", "host4", "host5"),
@@ -66,7 +71,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) ===
Array(null, Array("host2", "host3"), Array("host2", "host3")))
@@ -86,7 +92,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
}
@@ -105,7 +112,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(null, null, null))
}
@@ -118,8 +126,28 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 0, Map.empty, handler.allocatedHostToContainersMap)
+ 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(null))
}
+
+ test("allocate locality preferred containers by considering the localities of pending requests") {
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val pendingAllocationRequests = Seq(
+ createContainerRequest(Array("host2", "host3")),
+ createContainerRequest(Array("host1", "host4")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, pendingAllocationRequests)
+
+ assert(localities.map(_.nodes) === Array(Array("host3")))
+ }
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 5d05f514ad..bd80036c5c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -116,7 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
@@ -134,7 +134,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host1")
@@ -154,7 +154,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (2)
+ handler.getPendingAllocate.size should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -174,11 +174,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (3)
+ handler.getPendingAllocate.size should be (3)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
@@ -189,18 +189,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
}
test("decrease total requested executors to less than currently running") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (3)
+ handler.getPendingAllocate.size should be (3)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -210,7 +210,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (0)
+ handler.getPendingAllocate.size should be (0)
handler.getNumExecutorsRunning should be (2)
}
@@ -218,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -233,14 +233,14 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
}
test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -255,7 +255,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.processCompletedContainers(statuses.toSeq)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (2)
+ handler.getPendingAllocate.size should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}