aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-02 10:23:30 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-02 10:23:30 -0800
commita930e624eb9feb0f7d37d99dcb8178feb9c0f177 (patch)
treea514c4db4a1695da3c5fb4dab1e3c98a9798a5e5 /yarn
parent74ba95228d71a6dc4e95fef19f41dabe7c363d9e (diff)
downloadspark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.tar.gz
spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.tar.bz2
spark-a930e624eb9feb0f7d37d99dcb8178feb9c0f177.zip
[SPARK-9817][YARN] Improve the locality calculation of containers by taking pending container requests into consideraion
This is a follow-up PR to further improve the locality calculation by considering the pending container's request. Since the locality preferences of tasks may be shifted from time to time, current localities of pending container requests may not fully match the new preferences, this PR improve it by removing outdated, unmatched container requests and replace with new requests. sryza please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #8100 from jerryshao/SPARK-9817.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala60
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala73
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala38
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala26
5 files changed, 159 insertions, 40 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b4d9990ce..c6a6d7ac56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -375,7 +375,7 @@ private[spark] class ApplicationMaster(
}
}
try {
- val numPendingAllocate = allocator.getNumPendingAllocate
+ val numPendingAllocate = allocator.getPendingAllocate.size
val sleepInterval =
if (numPendingAllocate > 0) {
val currentAllocationInterval =
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 081780204e..2ec189de7c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -18,9 +18,11 @@
package org.apache.spark.deploy.yarn
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.spark.SparkConf
@@ -30,8 +32,8 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
/**
* This strategy is calculating the optimal locality preferences of YARN containers by considering
* the node ratio of pending tasks, number of required cores/containers and and locality of current
- * existing containers. The target of this algorithm is to maximize the number of tasks that
- * would run locally.
+ * existing and pending allocated containers. The target of this algorithm is to maximize the number
+ * of tasks that would run locally.
*
* Consider a situation in which we have 20 tasks that require (host1, host2, host3)
* and 10 tasks that require (host1, host2, host4), besides each container has 2 cores
@@ -91,6 +93,11 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* @param numLocalityAwareTasks number of locality required tasks
* @param hostToLocalTaskCount a map to store the preferred hostname and possible task
* numbers running on it, used as hints for container allocation
+ * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the
+ * expected locality preference by considering the existing
+ * containers
+ * @param localityMatchedPendingAllocations A sequence of pending container request which
+ * matches the localities of current required tasks.
* @return node localities and rack localities, each locality is an array of string,
* the length of localities is the same as number of containers
*/
@@ -98,10 +105,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
numContainer: Int,
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
- allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+ localityMatchedPendingAllocations: Seq[ContainerRequest]
): Array[ContainerLocalityPreferences] = {
val updatedHostToContainerCount = expectedHostToContainerCount(
- numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap)
+ numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
+ localityMatchedPendingAllocations)
val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
// The number of containers to allocate, divided into two groups, one with preferred locality,
@@ -158,20 +167,28 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* @param localityAwareTasks number of locality aware tasks
* @param hostToLocalTaskCount a map to store the preferred hostname and possible task
* numbers running on it, used as hints for container allocation
+ * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the
+ * expected locality preference by considering the existing
+ * containers
+ * @param localityMatchedPendingAllocations A sequence of pending container request which
+ * matches the localities of current required tasks.
* @return a map with hostname as key and required number of containers on this host as value
*/
private def expectedHostToContainerCount(
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
- allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+ localityMatchedPendingAllocations: Seq[ContainerRequest]
): Map[String, Int] = {
val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+ val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)
+
hostToLocalTaskCount.map { case (host, count) =>
val expectedCount =
count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
- val existedCount = allocatedHostToContainersMap.get(host)
- .map(_.size)
- .getOrElse(0)
+ // Take the locality of pending containers into consideration
+ val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
+ pendingHostToContainersMap.getOrElse(host, 0.0)
// If existing container can not fully satisfy the expected number of container,
// the required container number is expected count minus existed count. Otherwise the
@@ -179,4 +196,31 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
(host, math.max(0, (expectedCount - existedCount).ceil.toInt))
}
}
+
+ /**
+ * According to the locality ratio and number of container requests, calculate the host to
+ * possible number of containers for pending allocated containers.
+ *
+ * If current locality ratio of hosts is: Host1 : Host2 : Host3 = 20 : 20 : 10,
+ * and pending container requests is 3, so the possible number of containers on
+ * Host1 : Host2 : Host3 will be 1.2 : 1.2 : 0.6.
+ * @param localityMatchedPendingAllocations A sequence of pending container request which
+ * matches the localities of current required tasks.
+ * @return a Map with hostname as key and possible number of containers on this host as value
+ */
+ private def pendingHostToContainerCount(
+ localityMatchedPendingAllocations: Seq[ContainerRequest]): Map[String, Double] = {
+ val pendingHostToContainerCount = new HashMap[String, Int]()
+ localityMatchedPendingAllocations.foreach { cr =>
+ cr.getNodes.asScala.foreach { n =>
+ val count = pendingHostToContainerCount.getOrElse(n, 0) + 1
+ pendingHostToContainerCount(n) = count
+ }
+ }
+
+ val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
+ val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
+ pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
+ .toMap
+ }
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 875bbd4e4e..a0cf1b4aa4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -157,15 +157,19 @@ private[yarn] class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed
/**
- * Number of container requests that have not yet been fulfilled.
+ * A sequence of pending container requests that have not yet been fulfilled.
*/
- def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+ def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
/**
- * Number of container requests at the given location that have not yet been fulfilled.
+ * A sequence of pending container requests at the given location that have not yet been
+ * fulfilled.
*/
- private def getNumPendingAtLocation(location: String): Int =
- amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala.map(_.size).sum
+ private def getPendingAtLocation(location: String): Seq[ContainerRequest] = {
+ amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala
+ .flatMap(_.asScala)
+ .toSeq
+ }
/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
@@ -251,20 +255,31 @@ private[yarn] class YarnAllocator(
* Visible for testing.
*/
def updateResourceRequests(): Unit = {
- val numPendingAllocate = getNumPendingAllocate
+ val pendingAllocate = getPendingAllocate
+ val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
- // TODO. Consider locality preferences of pending container requests.
- // Since the last time we made container requests, stages have completed and been submitted,
- // and that the localities at which we requested our pending executors
- // no longer apply to our current needs. We should consider to remove all outstanding
- // container requests and add requests anew each time to avoid this.
if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
+ // Split the pending container request into three groups: locality matched list, locality
+ // unmatched list and non-locality list. Take the locality matched container request into
+ // consideration of container placement, treat as allocated containers.
+ // For locality unmatched and locality free container requests, cancel these container
+ // requests, since required locality preference has been changed, recalculating using
+ // container placement strategy.
+ val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
+ hostToLocalTaskCounts, pendingAllocate)
+
+ // Remove the outdated container request and recalculate the requested container number
+ localityUnMatched.foreach(amClient.removeContainerRequest)
+ localityFree.foreach(amClient.removeContainerRequest)
+ val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
+
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
- missing, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap)
+ updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
+ allocatedHostToContainersMap, localityMatched)
for (locality <- containerLocalityPreferences) {
val request = createContainerRequest(resource, locality.nodes, locality.racks)
@@ -291,7 +306,7 @@ private[yarn] class YarnAllocator(
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
*/
- protected def createContainerRequest(
+ private def createContainerRequest(
resource: Resource,
nodes: Array[String],
racks: Array[String]): ContainerRequest = {
@@ -535,6 +550,38 @@ private[yarn] class YarnAllocator(
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+ /**
+ * Split the pending container requests into 3 groups based on current localities of pending
+ * tasks.
+ * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+ * container placement hint.
+ * @param pendingAllocations A sequence of pending allocation container request.
+ * @return A tuple of 3 sequences, first is a sequence of locality matched container
+ * requests, second is a sequence of locality unmatched container requests, and third is a
+ * sequence of locality free container requests.
+ */
+ private def splitPendingAllocationsByLocality(
+ hostToLocalTaskCount: Map[String, Int],
+ pendingAllocations: Seq[ContainerRequest]
+ ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
+ val localityMatched = ArrayBuffer[ContainerRequest]()
+ val localityUnMatched = ArrayBuffer[ContainerRequest]()
+ val localityFree = ArrayBuffer[ContainerRequest]()
+
+ val preferredHosts = hostToLocalTaskCount.keySet
+ pendingAllocations.foreach { cr =>
+ val nodes = cr.getNodes
+ if (nodes == null) {
+ localityFree += cr
+ } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
+ localityMatched += cr
+ } else {
+ localityUnMatched += cr
+ }
+ }
+
+ (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
+ }
}
private object YarnAllocator {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
index b7fe4ccc67..afb4b691b5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.SparkFunSuite
@@ -26,6 +27,9 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
private val yarnAllocatorSuite = new YarnAllocatorSuite
import yarnAllocatorSuite._
+ def createContainerRequest(nodes: Array[String]): ContainerRequest =
+ new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+
override def beforeEach() {
yarnAllocatorSuite.beforeEach()
}
@@ -44,7 +48,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(
Array("host3", "host4", "host5"),
@@ -66,7 +71,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) ===
Array(null, Array("host2", "host3"), Array("host2", "host3")))
@@ -86,7 +92,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
}
@@ -105,7 +112,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(null, null, null))
}
@@ -118,8 +126,28 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
- 1, 0, Map.empty, handler.allocatedHostToContainersMap)
+ 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
assert(localities.map(_.nodes) === Array(null))
}
+
+ test("allocate locality preferred containers by considering the localities of pending requests") {
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val pendingAllocationRequests = Seq(
+ createContainerRequest(Array("host2", "host3")),
+ createContainerRequest(Array("host1", "host4")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+ handler.allocatedHostToContainersMap, pendingAllocationRequests)
+
+ assert(localities.map(_.nodes) === Array(Array("host3")))
+ }
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 5d05f514ad..bd80036c5c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -116,7 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
@@ -134,7 +134,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host1")
@@ -154,7 +154,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (2)
+ handler.getPendingAllocate.size should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -174,11 +174,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (3)
+ handler.getPendingAllocate.size should be (3)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
@@ -189,18 +189,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
}
test("decrease total requested executors to less than currently running") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (3)
+ handler.getPendingAllocate.size should be (3)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -210,7 +210,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.updateResourceRequests()
- handler.getNumPendingAllocate should be (0)
+ handler.getPendingAllocate.size should be (0)
handler.getNumExecutorsRunning should be (2)
}
@@ -218,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -233,14 +233,14 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (1)
+ handler.getPendingAllocate.size should be (1)
}
test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (4)
+ handler.getPendingAllocate.size should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@@ -255,7 +255,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.processCompletedContainers(statuses.toSeq)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
- handler.getNumPendingAllocate should be (2)
+ handler.getPendingAllocate.size should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}