diff options
author | jerryshao <saisai.shao@intel.com> | 2015-07-27 15:46:35 -0700 |
---|---|---|
committer | Sandy Ryza <sandy@cloudera.com> | 2015-07-27 15:46:35 -0700 |
commit | ab625956616664c2b4861781a578311da75a9ae4 (patch) | |
tree | 314bc1f84b5a5794676c362b822414e47d190da2 /yarn/src | |
parent | 2104931d7d726eda2c098e0f403c7f1533df8746 (diff) | |
download | spark-ab625956616664c2b4861781a578311da75a9ae4.tar.gz spark-ab625956616664c2b4861781a578311da75a9ae4.tar.bz2 spark-ab625956616664c2b4861781a578311da75a9ae4.zip |
[SPARK-4352] [YARN] [WIP] Incorporate locality preferences in dynamic allocation requests
Currently there's no locality preference for container request in YARN mode, this will affect the performance if fetching data remotely, so here proposed to add locality in Yarn dynamic allocation mode.
Ping sryza, please help to review, thanks a lot.
Author: jerryshao <saisai.shao@intel.com>
Closes #6394 from jerryshao/SPARK-4352 and squashes the following commits:
d45fecb [jerryshao] Add documents
6c3fe5c [jerryshao] Fix bug
8db6c0e [jerryshao] Further address the comments
2e2b2cb [jerryshao] Fix rebase compiling problem
ce5f096 [jerryshao] Fix style issue
7f7df95 [jerryshao] Fix rebase issue
9ca9e07 [jerryshao] Code refactor according to comments
d3e4236 [jerryshao] Further address the comments
5e7a593 [jerryshao] Fix bug introduced code rebase
9ca7783 [jerryshao] Style changes
08317f9 [jerryshao] code and comment refines
65b2423 [jerryshao] Further address the comments
a27c587 [jerryshao] address the comment
27faabc [jerryshao] redundant code remove
9ce06a1 [jerryshao] refactor the code
f5ba27b [jerryshao] Style fix
2c6cc8a [jerryshao] Fix bug and add unit tests
0757335 [jerryshao] Consider the distribution of existed containers to recalculate the new container requests
0ad66ff [jerryshao] Fix compile bugs
1c20381 [jerryshao] Minor fix
5ef2dc8 [jerryshao] Add docs and improve the code
3359814 [jerryshao] Fix rebase and test bugs
0398539 [jerryshao] reinitialize the new implementation
67596d6 [jerryshao] Still fix the code
654e1d2 [jerryshao] Fix some bugs
45b1c89 [jerryshao] Further polish the algorithm
dea0152 [jerryshao] Enable node locality information in YarnAllocator
74bbcc6 [jerryshao] Support node locality for dynamic allocation initial commit
Diffstat (limited to 'yarn/src')
5 files changed, 354 insertions, 19 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 83dafa4a12..44acc7374d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -555,11 +555,12 @@ private[spark] class ApplicationMaster( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RequestExecutors(requestedTotal) => + case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) => Option(allocator) match { case Some(a) => allocatorLock.synchronized { - if (a.requestTotalExecutors(requestedTotal)) { + if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal, + localityAwareTasks, hostToLocalTaskCount)) { allocatorLock.notifyAll() } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala new file mode 100644 index 0000000000..081780204e --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.{ArrayBuffer, HashMap, Set} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records.{ContainerId, Resource} +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existing containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * Consider a situation in which we have 20 tasks that require (host1, host2, host3) + * and 10 tasks that require (host1, host2, host4), besides each container has 2 cores + * and cpus per task is 1, so the required container number is 15, + * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers exist but none of them can match the requested localities, + * follow the method of 1 and 2. + * + * 4. If containers exist and some of them can match the requested localities. + * For example if we have 1 containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), + * and the expected containers on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers exist and existing localities can fully cover the requested localities. + * For example if we have 5 containers on each node (host1: 5, host2: 5, host3: 5, host4: 5), + * which could cover the current requested localities. This algorithm will allocate all the + * requested containers with no localities. + */ +private[yarn] class LocalityPreferredContainerPlacementStrategy( + val sparkConf: SparkConf, + val yarnConf: Configuration, + val resource: Resource) { + + // Number of CPUs per task + private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1) + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwareTasks number of locality required tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwareTasks: Int, + hostToLocalTaskCount: Map[String, Int], + allocatedHostToContainersMap: HashMap[String, Set[ContainerId]] + ): Array[ContainerLocalityPreferences] = { + val updatedHostToContainerCount = expectedHostToContainerCount( + numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap) + val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum + + // The number of containers to allocate, divided into two groups, one with preferred locality, + // and the other without locality preference. + val requiredLocalityFreeContainerNum = + math.max(0, numContainer - updatedLocalityAwareContainerNum) + val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum + + val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]() + if (requiredLocalityFreeContainerNum > 0) { + for (i <- 0 until requiredLocalityFreeContainerNum) { + containerLocalityPreferences += ContainerLocalityPreferences( + null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]]) + } + } + + if (requiredLocalityAwareContainerNum > 0) { + val largestRatio = updatedHostToContainerCount.values.max + // Round the ratio of preferred locality to the number of locality required container + // number, which is used for locality preferred host calculating. + var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio => + val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio + adjustedRatio.ceil.toInt + } + + for (i <- 0 until requiredLocalityAwareContainerNum) { + // Only filter out the ratio which is larger than 0, which means the current host can + // still be allocated with new container request. + val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray + val racks = hosts.map { h => + RackResolver.resolve(yarnConf, h).getNetworkLocation + }.toSet + containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray) + + // Minus 1 each time when the host is used. When the current ratio is 0, + // which means all the required ratio is satisfied, this host will not be allocated again. + preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1) + } + } + + containerLocalityPreferences.toArray + } + + /** + * Calculate the number of executors need to satisfy the given number of pending tasks. + */ + private def numExecutorsPending(numTasksPending: Int): Int = { + val coresPerExecutor = resource.getVirtualCores + (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor + } + + /** + * Calculate the expected host to number of containers by considering with allocated containers. + * @param localityAwareTasks number of locality aware tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return a map with hostname as key and required number of containers on this host as value + */ + private def expectedHostToContainerCount( + localityAwareTasks: Int, + hostToLocalTaskCount: Map[String, Int], + allocatedHostToContainersMap: HashMap[String, Set[ContainerId]] + ): Map[String, Int] = { + val totalLocalTaskNum = hostToLocalTaskCount.values.sum + hostToLocalTaskCount.map { case (host, count) => + val expectedCount = + count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum + val existedCount = allocatedHostToContainersMap.get(host) + .map(_.size) + .getOrElse(0) + + // If existing container can not fully satisfy the expected number of container, + // the required container number is expected count minus existed count. Otherwise the + // required container number is 0. + (host, math.max(0, (expectedCount - existedCount).ceil.toInt)) + } + } +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 940873fbd0..6c103394af 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -96,7 +96,7 @@ private[yarn] class YarnAllocator( // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors - private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) + private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) private val launcherPool = new ThreadPoolExecutor( // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue @@ -127,6 +127,16 @@ private[yarn] class YarnAllocator( } } + // A map to store preferred hostname and possible task numbers running on it. + private var hostToLocalTaskCounts: Map[String, Int] = Map.empty + + // Number of tasks that have locality preferences in active stages + private var numLocalityAwareTasks: Int = 0 + + // A container placement strategy based on pending tasks' locality preference + private[yarn] val containerPlacementStrategy = + new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource) + def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = numExecutorsFailed @@ -146,10 +156,19 @@ private[yarn] class YarnAllocator( * Request as many executors from the ResourceManager as needed to reach the desired total. If * the requested total is smaller than the current number of running executors, no executors will * be killed. - * + * @param requestedTotal total number of containers requested + * @param localityAwareTasks number of locality aware tasks to be used as container placement hint + * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as + * container placement hint. * @return Whether the new requested total is different than the old value. */ - def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized { + def requestTotalExecutorsWithPreferredLocalities( + requestedTotal: Int, + localityAwareTasks: Int, + hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized { + this.numLocalityAwareTasks = localityAwareTasks + this.hostToLocalTaskCounts = hostToLocalTaskCount + if (requestedTotal != targetNumExecutors) { logInfo(s"Driver requested a total number of $requestedTotal executor(s).") targetNumExecutors = requestedTotal @@ -221,12 +240,20 @@ private[yarn] class YarnAllocator( val numPendingAllocate = getNumPendingAllocate val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + // TODO. Consider locality preferences of pending container requests. + // Since the last time we made container requests, stages have completed and been submitted, + // and that the localities at which we requested our pending executors + // no longer apply to our current needs. We should consider to remove all outstanding + // container requests and add requests anew each time to avoid this. if (missing > 0) { logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " + s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") - for (i <- 0 until missing) { - val request = createContainerRequest(resource) + val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( + missing, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap) + + for (locality <- containerLocalityPreferences) { + val request = createContainerRequest(resource, locality.nodes, locality.racks) amClient.addContainerRequest(request) val nodes = request.getNodes val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last @@ -249,11 +276,14 @@ private[yarn] class YarnAllocator( * Creates a container request, handling the reflection required to use YARN features that were * added in recent versions. */ - private def createContainerRequest(resource: Resource): ContainerRequest = { + protected def createContainerRequest( + resource: Resource, + nodes: Array[String], + racks: Array[String]): ContainerRequest = { nodeLabelConstructor.map { constructor => - constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean, + constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean, labelExpression.orNull) - }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)) + }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY)) } /** @@ -437,7 +467,6 @@ private[yarn] class YarnAllocator( releasedContainers.add(container.getId()) amClient.releaseAssignedContainer(container.getId()) } - } private object YarnAllocator { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala new file mode 100644 index 0000000000..b7fe4ccc67 --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.SparkFunSuite + +class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + + private val yarnAllocatorSuite = new YarnAllocatorSuite + import yarnAllocatorSuite._ + + override def beforeEach() { + yarnAllocatorSuite.beforeEach() + } + + override def afterEach() { + yarnAllocatorSuite.afterEach() + } + + test("allocate locality preferred containers with enough resource and no matched existed " + + "containers") { + // 1. All the locations of current containers cannot satisfy the new requirements + // 2. Current requested container number can fully satisfy the pending tasks. + + val handler = createAllocator(2) + handler.updateResourceRequests() + handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) + + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( + 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap) + + assert(localities.map(_.nodes) === Array( + Array("host3", "host4", "host5"), + Array("host3", "host4", "host5"), + Array("host3", "host4"))) + } + + test("allocate locality preferred containers with enough resource and partially matched " + + "containers") { + // 1. Parts of current containers' locations can satisfy the new requirements + // 2. Current requested container number can fully satisfy the pending tasks. + + val handler = createAllocator(3) + handler.updateResourceRequests() + handler.handleAllocatedContainers(Array( + createContainer("host1"), + createContainer("host1"), + createContainer("host2") + )) + + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( + 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap) + + assert(localities.map(_.nodes) === + Array(null, Array("host2", "host3"), Array("host2", "host3"))) + } + + test("allocate locality preferred containers with limited resource and partially matched " + + "containers") { + // 1. Parts of current containers' locations can satisfy the new requirements + // 2. Current requested container number cannot fully satisfy the pending tasks. + + val handler = createAllocator(3) + handler.updateResourceRequests() + handler.handleAllocatedContainers(Array( + createContainer("host1"), + createContainer("host1"), + createContainer("host2") + )) + + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( + 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap) + + assert(localities.map(_.nodes) === Array(Array("host2", "host3"))) + } + + test("allocate locality preferred containers with fully matched containers") { + // Current containers' locations can fully satisfy the new requirements + + val handler = createAllocator(5) + handler.updateResourceRequests() + handler.handleAllocatedContainers(Array( + createContainer("host1"), + createContainer("host1"), + createContainer("host2"), + createContainer("host2"), + createContainer("host3") + )) + + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( + 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap) + + assert(localities.map(_.nodes) === Array(null, null, null)) + } + + test("allocate containers with no locality preference") { + // Request new container without locality preference + + val handler = createAllocator(2) + handler.updateResourceRequests() + handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) + + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( + 1, 0, Map.empty, handler.allocatedHostToContainersMap) + + assert(localities.map(_.nodes) === Array(null)) + } +} diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 7509000771..37a789fcd3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkFunSuite} import org.apache.spark.SparkConf @@ -32,8 +33,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.YarnAllocator._ import org.apache.spark.scheduler.SplitInfo -import org.scalatest.{BeforeAndAfterEach, Matchers} - class MockResolver extends DNSToSwitchMapping { override def resolve(names: JList[String]): JList[String] = { @@ -171,7 +170,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (4) - handler.requestTotalExecutors(3) + handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty) handler.updateResourceRequests() handler.getNumPendingAllocate should be (3) @@ -182,7 +181,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) - handler.requestTotalExecutors(2) + handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty) handler.updateResourceRequests() handler.getNumPendingAllocate should be (1) } @@ -193,7 +192,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (4) - handler.requestTotalExecutors(3) + handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty) handler.updateResourceRequests() handler.getNumPendingAllocate should be (3) @@ -203,7 +202,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (2) - handler.requestTotalExecutors(1) + handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty) handler.updateResourceRequests() handler.getNumPendingAllocate should be (0) handler.getNumExecutorsRunning should be (2) @@ -219,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) - handler.requestTotalExecutors(1) + handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty) handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) } val statuses = Seq(container1, container2).map { c => @@ -241,5 +240,4 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) } - } |