aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala182
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala47
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala125
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala14
5 files changed, 354 insertions, 19 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 83dafa4a12..44acc7374d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -555,11 +555,12 @@ private[spark] class ApplicationMaster(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RequestExecutors(requestedTotal) =>
+ case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
Option(allocator) match {
case Some(a) =>
allocatorLock.synchronized {
- if (a.requestTotalExecutors(requestedTotal)) {
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
allocatorLock.notifyAll()
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
new file mode 100644
index 0000000000..081780204e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN containers by considering
+ * the node ratio of pending tasks, number of required cores/containers and and locality of current
+ * existing containers. The target of this algorithm is to maximize the number of tasks that
+ * would run locally.
+ *
+ * Consider a situation in which we have 20 tasks that require (host1, host2, host3)
+ * and 10 tasks that require (host1, host2, host4), besides each container has 2 cores
+ * and cpus per task is 1, so the required container number is 15,
+ * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1)
+ *
+ * 3. If containers exist but none of them can match the requested localities,
+ * follow the method of 1 and 2.
+ *
+ * 4. If containers exist and some of them can match the requested localities.
+ * For example if we have 1 containers on each node (host1: 1, host2: 1: host3: 1, host4: 1),
+ * and the expected containers on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to (host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ * 4.1 If requested container number (18) is more than newly required containers (12). Follow
+ * method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 4.2 If request container number (10) is more than newly required containers (12). Follow
+ * method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers exist and existing localities can fully cover the requested localities.
+ * For example if we have 5 containers on each node (host1: 5, host2: 5, host3: 5, host4: 5),
+ * which could cover the current requested localities. This algorithm will allocate all the
+ * requested containers with no localities.
+ */
+private[yarn] class LocalityPreferredContainerPlacementStrategy(
+ val sparkConf: SparkConf,
+ val yarnConf: Configuration,
+ val resource: Resource) {
+
+ // Number of CPUs per task
+ private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1)
+
+ /**
+ * Calculate each container's node locality and rack locality
+ * @param numContainer number of containers to calculate
+ * @param numLocalityAwareTasks number of locality required tasks
+ * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+ * numbers running on it, used as hints for container allocation
+ * @return node localities and rack localities, each locality is an array of string,
+ * the length of localities is the same as number of containers
+ */
+ def localityOfRequestedContainers(
+ numContainer: Int,
+ numLocalityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int],
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ ): Array[ContainerLocalityPreferences] = {
+ val updatedHostToContainerCount = expectedHostToContainerCount(
+ numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap)
+ val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
+
+ // The number of containers to allocate, divided into two groups, one with preferred locality,
+ // and the other without locality preference.
+ val requiredLocalityFreeContainerNum =
+ math.max(0, numContainer - updatedLocalityAwareContainerNum)
+ val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
+
+ val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
+ if (requiredLocalityFreeContainerNum > 0) {
+ for (i <- 0 until requiredLocalityFreeContainerNum) {
+ containerLocalityPreferences += ContainerLocalityPreferences(
+ null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
+ }
+ }
+
+ if (requiredLocalityAwareContainerNum > 0) {
+ val largestRatio = updatedHostToContainerCount.values.max
+ // Round the ratio of preferred locality to the number of locality required container
+ // number, which is used for locality preferred host calculating.
+ var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
+ val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
+ adjustedRatio.ceil.toInt
+ }
+
+ for (i <- 0 until requiredLocalityAwareContainerNum) {
+ // Only filter out the ratio which is larger than 0, which means the current host can
+ // still be allocated with new container request.
+ val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+ val racks = hosts.map { h =>
+ RackResolver.resolve(yarnConf, h).getNetworkLocation
+ }.toSet
+ containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
+
+ // Minus 1 each time when the host is used. When the current ratio is 0,
+ // which means all the required ratio is satisfied, this host will not be allocated again.
+ preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+ }
+ }
+
+ containerLocalityPreferences.toArray
+ }
+
+ /**
+ * Calculate the number of executors need to satisfy the given number of pending tasks.
+ */
+ private def numExecutorsPending(numTasksPending: Int): Int = {
+ val coresPerExecutor = resource.getVirtualCores
+ (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
+ }
+
+ /**
+ * Calculate the expected host to number of containers by considering with allocated containers.
+ * @param localityAwareTasks number of locality aware tasks
+ * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+ * numbers running on it, used as hints for container allocation
+ * @return a map with hostname as key and required number of containers on this host as value
+ */
+ private def expectedHostToContainerCount(
+ localityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int],
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ ): Map[String, Int] = {
+ val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+ hostToLocalTaskCount.map { case (host, count) =>
+ val expectedCount =
+ count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
+ val existedCount = allocatedHostToContainersMap.get(host)
+ .map(_.size)
+ .getOrElse(0)
+
+ // If existing container can not fully satisfy the expected number of container,
+ // the required container number is expected count minus existed count. Otherwise the
+ // required container number is 0.
+ (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
+ }
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 940873fbd0..6c103394af 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -96,7 +96,7 @@ private[yarn] class YarnAllocator(
// Number of cores per executor.
protected val executorCores = args.executorCores
// Resource capability requested for each executors
- private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
+ private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
@@ -127,6 +127,16 @@ private[yarn] class YarnAllocator(
}
}
+ // A map to store preferred hostname and possible task numbers running on it.
+ private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
+
+ // Number of tasks that have locality preferences in active stages
+ private var numLocalityAwareTasks: Int = 0
+
+ // A container placement strategy based on pending tasks' locality preference
+ private[yarn] val containerPlacementStrategy =
+ new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+
def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -146,10 +156,19 @@ private[yarn] class YarnAllocator(
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
- *
+ * @param requestedTotal total number of containers requested
+ * @param localityAwareTasks number of locality aware tasks to be used as container placement hint
+ * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+ * container placement hint.
* @return Whether the new requested total is different than the old value.
*/
- def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
+ def requestTotalExecutorsWithPreferredLocalities(
+ requestedTotal: Int,
+ localityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+ this.numLocalityAwareTasks = localityAwareTasks
+ this.hostToLocalTaskCounts = hostToLocalTaskCount
+
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
@@ -221,12 +240,20 @@ private[yarn] class YarnAllocator(
val numPendingAllocate = getNumPendingAllocate
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+ // TODO. Consider locality preferences of pending container requests.
+ // Since the last time we made container requests, stages have completed and been submitted,
+ // and that the localities at which we requested our pending executors
+ // no longer apply to our current needs. We should consider to remove all outstanding
+ // container requests and add requests anew each time to avoid this.
if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
- for (i <- 0 until missing) {
- val request = createContainerRequest(resource)
+ val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
+ missing, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap)
+
+ for (locality <- containerLocalityPreferences) {
+ val request = createContainerRequest(resource, locality.nodes, locality.racks)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -249,11 +276,14 @@ private[yarn] class YarnAllocator(
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
*/
- private def createContainerRequest(resource: Resource): ContainerRequest = {
+ protected def createContainerRequest(
+ resource: Resource,
+ nodes: Array[String],
+ racks: Array[String]): ContainerRequest = {
nodeLabelConstructor.map { constructor =>
- constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
+ constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
labelExpression.orNull)
- }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY))
+ }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
}
/**
@@ -437,7 +467,6 @@ private[yarn] class YarnAllocator(
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
-
}
private object YarnAllocator {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
new file mode 100644
index 0000000000..b7fe4ccc67
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
+
+ private val yarnAllocatorSuite = new YarnAllocatorSuite
+ import yarnAllocatorSuite._
+
+ override def beforeEach() {
+ yarnAllocatorSuite.beforeEach()
+ }
+
+ override def afterEach() {
+ yarnAllocatorSuite.afterEach()
+ }
+
+ test("allocate locality preferred containers with enough resource and no matched existed " +
+ "containers") {
+ // 1. All the locations of current containers cannot satisfy the new requirements
+ // 2. Current requested container number can fully satisfy the pending tasks.
+
+ val handler = createAllocator(2)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(
+ Array("host3", "host4", "host5"),
+ Array("host3", "host4", "host5"),
+ Array("host3", "host4")))
+ }
+
+ test("allocate locality preferred containers with enough resource and partially matched " +
+ "containers") {
+ // 1. Parts of current containers' locations can satisfy the new requirements
+ // 2. Current requested container number can fully satisfy the pending tasks.
+
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) ===
+ Array(null, Array("host2", "host3"), Array("host2", "host3")))
+ }
+
+ test("allocate locality preferred containers with limited resource and partially matched " +
+ "containers") {
+ // 1. Parts of current containers' locations can satisfy the new requirements
+ // 2. Current requested container number cannot fully satisfy the pending tasks.
+
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
+ }
+
+ test("allocate locality preferred containers with fully matched containers") {
+ // Current containers' locations can fully satisfy the new requirements
+
+ val handler = createAllocator(5)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2"),
+ createContainer("host2"),
+ createContainer("host3")
+ ))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(null, null, null))
+ }
+
+ test("allocate containers with no locality preference") {
+ // Request new container without locality preference
+
+ val handler = createAllocator(2)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 0, Map.empty, handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(null))
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 7509000771..37a789fcd3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkFunSuite}
import org.apache.spark.SparkConf
@@ -32,8 +33,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.scheduler.SplitInfo
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-
class MockResolver extends DNSToSwitchMapping {
override def resolve(names: JList[String]): JList[String] = {
@@ -171,7 +170,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
- handler.requestTotalExecutors(3)
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (3)
@@ -182,7 +181,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
- handler.requestTotalExecutors(2)
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (1)
}
@@ -193,7 +192,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
- handler.requestTotalExecutors(3)
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (3)
@@ -203,7 +202,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (2)
- handler.requestTotalExecutors(1)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (0)
handler.getNumExecutorsRunning should be (2)
@@ -219,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
- handler.requestTotalExecutors(1)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
val statuses = Seq(container1, container2).map { c =>
@@ -241,5 +240,4 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
-
}