diff options
author | jerryshao <sshao@hortonworks.com> | 2015-11-02 10:23:30 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-11-02 10:23:30 -0800 |
commit | a930e624eb9feb0f7d37d99dcb8178feb9c0f177 (patch) | |
tree | a514c4db4a1695da3c5fb4dab1e3c98a9798a5e5 /yarn/src/test/scala | |
parent | 74ba95228d71a6dc4e95fef19f41dabe7c363d9e (diff) | |
download | spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.tar.gz spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.tar.bz2 spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.zip |
[SPARK-9817][YARN] Improve the locality calculation of containers by taking pending container requests into consideraion
This is a follow-up PR to further improve the locality calculation by considering the pending container's request. Since the locality preferences of tasks may be shifted from time to time, current localities of pending container requests may not fully match the new preferences, this PR improve it by removing outdated, unmatched container requests and replace with new requests.
sryza please help to review, thanks a lot.
Author: jerryshao <sshao@hortonworks.com>
Closes #8100 from jerryshao/SPARK-9817.
Diffstat (limited to 'yarn/src/test/scala')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala | 38 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 26 |
2 files changed, 46 insertions, 18 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala index b7fe4ccc67..afb4b691b5 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.SparkFunSuite @@ -26,6 +27,9 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B private val yarnAllocatorSuite = new YarnAllocatorSuite import yarnAllocatorSuite._ + def createContainerRequest(nodes: Array[String]): ContainerRequest = + new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) + override def beforeEach() { yarnAllocatorSuite.beforeEach() } @@ -44,7 +48,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap) + 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), + handler.allocatedHostToContainersMap, Seq.empty) assert(localities.map(_.nodes) === Array( Array("host3", "host4", "host5"), @@ -66,7 +71,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B )) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap) + 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), + handler.allocatedHostToContainersMap, Seq.empty) assert(localities.map(_.nodes) === Array(null, Array("host2", "host3"), Array("host2", "host3"))) @@ -86,7 +92,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B )) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap) + 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), + handler.allocatedHostToContainersMap, Seq.empty) assert(localities.map(_.nodes) === Array(Array("host2", "host3"))) } @@ -105,7 +112,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B )) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap) + 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), + handler.allocatedHostToContainersMap, Seq.empty) assert(localities.map(_.nodes) === Array(null, null, null)) } @@ -118,8 +126,28 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 1, 0, Map.empty, handler.allocatedHostToContainersMap) + 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty) assert(localities.map(_.nodes) === Array(null)) } + + test("allocate locality preferred containers by considering the localities of pending requests") { + val handler = createAllocator(3) + handler.updateResourceRequests() + handler.handleAllocatedContainers(Array( + createContainer("host1"), + createContainer("host1"), + createContainer("host2") + )) + + val pendingAllocationRequests = Seq( + createContainerRequest(Array("host2", "host3")), + createContainerRequest(Array("host1", "host4"))) + + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( + 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), + handler.allocatedHostToContainersMap, pendingAllocationRequests) + + assert(localities.map(_.nodes) === Array(Array("host3"))) + } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 5d05f514ad..bd80036c5c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -116,7 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val handler = createAllocator(1) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (1) + handler.getPendingAllocate.size should be (1) val container = createContainer("host1") handler.handleAllocatedContainers(Array(container)) @@ -134,7 +134,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val handler = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (4) + handler.getPendingAllocate.size should be (4) val container1 = createContainer("host1") val container2 = createContainer("host1") @@ -154,7 +154,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val handler = createAllocator(2) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (2) + handler.getPendingAllocate.size should be (2) val container1 = createContainer("host1") val container2 = createContainer("host2") @@ -174,11 +174,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val handler = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (4) + handler.getPendingAllocate.size should be (4) handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty) handler.updateResourceRequests() - handler.getNumPendingAllocate should be (3) + handler.getPendingAllocate.size should be (3) val container = createContainer("host1") handler.handleAllocatedContainers(Array(container)) @@ -189,18 +189,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty) handler.updateResourceRequests() - handler.getNumPendingAllocate should be (1) + handler.getPendingAllocate.size should be (1) } test("decrease total requested executors to less than currently running") { val handler = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (4) + handler.getPendingAllocate.size should be (4) handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty) handler.updateResourceRequests() - handler.getNumPendingAllocate should be (3) + handler.getPendingAllocate.size should be (3) val container1 = createContainer("host1") val container2 = createContainer("host2") @@ -210,7 +210,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty) handler.updateResourceRequests() - handler.getNumPendingAllocate should be (0) + handler.getPendingAllocate.size should be (0) handler.getNumExecutorsRunning should be (2) } @@ -218,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val handler = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (4) + handler.getPendingAllocate.size should be (4) val container1 = createContainer("host1") val container2 = createContainer("host2") @@ -233,14 +233,14 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.updateResourceRequests() handler.processCompletedContainers(statuses.toSeq) handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (1) + handler.getPendingAllocate.size should be (1) } test("lost executor removed from backend") { val handler = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (4) + handler.getPendingAllocate.size should be (4) val container1 = createContainer("host1") val container2 = createContainer("host2") @@ -255,7 +255,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.processCompletedContainers(statuses.toSeq) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getNumPendingAllocate should be (2) + handler.getPendingAllocate.size should be (2) handler.getNumExecutorsFailed should be (2) handler.getNumUnexpectedContainerRelease should be (2) } |