aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-07-27 15:46:35 -0700
committerSandy Ryza <sandy@cloudera.com>2015-07-27 15:46:35 -0700
commitab625956616664c2b4861781a578311da75a9ae4 (patch)
tree314bc1f84b5a5794676c362b822414e47d190da2 /yarn
parent2104931d7d726eda2c098e0f403c7f1533df8746 (diff)
downloadspark-ab625956616664c2b4861781a578311da75a9ae4.tar.gz
spark-ab625956616664c2b4861781a578311da75a9ae4.tar.bz2
spark-ab625956616664c2b4861781a578311da75a9ae4.zip
[SPARK-4352] [YARN] [WIP] Incorporate locality preferences in dynamic allocation requests
Currently there's no locality preference for container request in YARN mode, this will affect the performance if fetching data remotely, so here proposed to add locality in Yarn dynamic allocation mode. Ping sryza, please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #6394 from jerryshao/SPARK-4352 and squashes the following commits: d45fecb [jerryshao] Add documents 6c3fe5c [jerryshao] Fix bug 8db6c0e [jerryshao] Further address the comments 2e2b2cb [jerryshao] Fix rebase compiling problem ce5f096 [jerryshao] Fix style issue 7f7df95 [jerryshao] Fix rebase issue 9ca9e07 [jerryshao] Code refactor according to comments d3e4236 [jerryshao] Further address the comments 5e7a593 [jerryshao] Fix bug introduced code rebase 9ca7783 [jerryshao] Style changes 08317f9 [jerryshao] code and comment refines 65b2423 [jerryshao] Further address the comments a27c587 [jerryshao] address the comment 27faabc [jerryshao] redundant code remove 9ce06a1 [jerryshao] refactor the code f5ba27b [jerryshao] Style fix 2c6cc8a [jerryshao] Fix bug and add unit tests 0757335 [jerryshao] Consider the distribution of existed containers to recalculate the new container requests 0ad66ff [jerryshao] Fix compile bugs 1c20381 [jerryshao] Minor fix 5ef2dc8 [jerryshao] Add docs and improve the code 3359814 [jerryshao] Fix rebase and test bugs 0398539 [jerryshao] reinitialize the new implementation 67596d6 [jerryshao] Still fix the code 654e1d2 [jerryshao] Fix some bugs 45b1c89 [jerryshao] Further polish the algorithm dea0152 [jerryshao] Enable node locality information in YarnAllocator 74bbcc6 [jerryshao] Support node locality for dynamic allocation initial commit
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala182
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala47
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala125
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala14
5 files changed, 354 insertions, 19 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 83dafa4a12..44acc7374d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -555,11 +555,12 @@ private[spark] class ApplicationMaster(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RequestExecutors(requestedTotal) =>
+ case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
Option(allocator) match {
case Some(a) =>
allocatorLock.synchronized {
- if (a.requestTotalExecutors(requestedTotal)) {
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
allocatorLock.notifyAll()
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
new file mode 100644
index 0000000000..081780204e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN containers by considering
+ * the node ratio of pending tasks, number of required cores/containers and and locality of current
+ * existing containers. The target of this algorithm is to maximize the number of tasks that
+ * would run locally.
+ *
+ * Consider a situation in which we have 20 tasks that require (host1, host2, host3)
+ * and 10 tasks that require (host1, host2, host4), besides each container has 2 cores
+ * and cpus per task is 1, so the required container number is 15,
+ * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1)
+ *
+ * 3. If containers exist but none of them can match the requested localities,
+ * follow the method of 1 and 2.
+ *
+ * 4. If containers exist and some of them can match the requested localities.
+ * For example if we have 1 containers on each node (host1: 1, host2: 1: host3: 1, host4: 1),
+ * and the expected containers on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to (host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ * 4.1 If requested container number (18) is more than newly required containers (12). Follow
+ * method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 4.2 If request container number (10) is more than newly required containers (12). Follow
+ * method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers exist and existing localities can fully cover the requested localities.
+ * For example if we have 5 containers on each node (host1: 5, host2: 5, host3: 5, host4: 5),
+ * which could cover the current requested localities. This algorithm will allocate all the
+ * requested containers with no localities.
+ */
+private[yarn] class LocalityPreferredContainerPlacementStrategy(
+ val sparkConf: SparkConf,
+ val yarnConf: Configuration,
+ val resource: Resource) {
+
+ // Number of CPUs per task
+ private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1)
+
+ /**
+ * Calculate each container's node locality and rack locality
+ * @param numContainer number of containers to calculate
+ * @param numLocalityAwareTasks number of locality required tasks
+ * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+ * numbers running on it, used as hints for container allocation
+ * @return node localities and rack localities, each locality is an array of string,
+ * the length of localities is the same as number of containers
+ */
+ def localityOfRequestedContainers(
+ numContainer: Int,
+ numLocalityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int],
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ ): Array[ContainerLocalityPreferences] = {
+ val updatedHostToContainerCount = expectedHostToContainerCount(
+ numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap)
+ val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
+
+ // The number of containers to allocate, divided into two groups, one with preferred locality,
+ // and the other without locality preference.
+ val requiredLocalityFreeContainerNum =
+ math.max(0, numContainer - updatedLocalityAwareContainerNum)
+ val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
+
+ val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
+ if (requiredLocalityFreeContainerNum > 0) {
+ for (i <- 0 until requiredLocalityFreeContainerNum) {
+ containerLocalityPreferences += ContainerLocalityPreferences(
+ null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
+ }
+ }
+
+ if (requiredLocalityAwareContainerNum > 0) {
+ val largestRatio = updatedHostToContainerCount.values.max
+ // Round the ratio of preferred locality to the number of locality required container
+ // number, which is used for locality preferred host calculating.
+ var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
+ val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
+ adjustedRatio.ceil.toInt
+ }
+
+ for (i <- 0 until requiredLocalityAwareContainerNum) {
+ // Only filter out the ratio which is larger than 0, which means the current host can
+ // still be allocated with new container request.
+ val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+ val racks = hosts.map { h =>
+ RackResolver.resolve(yarnConf, h).getNetworkLocation
+ }.toSet
+ containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
+
+ // Minus 1 each time when the host is used. When the current ratio is 0,
+ // which means all the required ratio is satisfied, this host will not be allocated again.
+ preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+ }
+ }
+
+ containerLocalityPreferences.toArray
+ }
+
+ /**
+ * Calculate the number of executors need to satisfy the given number of pending tasks.
+ */
+ private def numExecutorsPending(numTasksPending: Int): Int = {
+ val coresPerExecutor = resource.getVirtualCores
+ (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
+ }
+
+ /**
+ * Calculate the expected host to number of containers by considering with allocated containers.
+ * @param localityAwareTasks number of locality aware tasks
+ * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+ * numbers running on it, used as hints for container allocation
+ * @return a map with hostname as key and required number of containers on this host as value
+ */
+ private def expectedHostToContainerCount(
+ localityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int],
+ allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+ ): Map[String, Int] = {
+ val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+ hostToLocalTaskCount.map { case (host, count) =>
+ val expectedCount =
+ count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
+ val existedCount = allocatedHostToContainersMap.get(host)
+ .map(_.size)
+ .getOrElse(0)
+
+ // If existing container can not fully satisfy the expected number of container,
+ // the required container number is expected count minus existed count. Otherwise the
+ // required container number is 0.
+ (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
+ }
+ }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 940873fbd0..6c103394af 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -96,7 +96,7 @@ private[yarn] class YarnAllocator(
// Number of cores per executor.
protected val executorCores = args.executorCores
// Resource capability requested for each executors
- private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
+ private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
@@ -127,6 +127,16 @@ private[yarn] class YarnAllocator(
}
}
+ // A map to store preferred hostname and possible task numbers running on it.
+ private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
+
+ // Number of tasks that have locality preferences in active stages
+ private var numLocalityAwareTasks: Int = 0
+
+ // A container placement strategy based on pending tasks' locality preference
+ private[yarn] val containerPlacementStrategy =
+ new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+
def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -146,10 +156,19 @@ private[yarn] class YarnAllocator(
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
- *
+ * @param requestedTotal total number of containers requested
+ * @param localityAwareTasks number of locality aware tasks to be used as container placement hint
+ * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+ * container placement hint.
* @return Whether the new requested total is different than the old value.
*/
- def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
+ def requestTotalExecutorsWithPreferredLocalities(
+ requestedTotal: Int,
+ localityAwareTasks: Int,
+ hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+ this.numLocalityAwareTasks = localityAwareTasks
+ this.hostToLocalTaskCounts = hostToLocalTaskCount
+
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
@@ -221,12 +240,20 @@ private[yarn] class YarnAllocator(
val numPendingAllocate = getNumPendingAllocate
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+ // TODO. Consider locality preferences of pending container requests.
+ // Since the last time we made container requests, stages have completed and been submitted,
+ // and that the localities at which we requested our pending executors
+ // no longer apply to our current needs. We should consider to remove all outstanding
+ // container requests and add requests anew each time to avoid this.
if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
- for (i <- 0 until missing) {
- val request = createContainerRequest(resource)
+ val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
+ missing, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap)
+
+ for (locality <- containerLocalityPreferences) {
+ val request = createContainerRequest(resource, locality.nodes, locality.racks)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -249,11 +276,14 @@ private[yarn] class YarnAllocator(
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
*/
- private def createContainerRequest(resource: Resource): ContainerRequest = {
+ protected def createContainerRequest(
+ resource: Resource,
+ nodes: Array[String],
+ racks: Array[String]): ContainerRequest = {
nodeLabelConstructor.map { constructor =>
- constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
+ constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
labelExpression.orNull)
- }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY))
+ }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
}
/**
@@ -437,7 +467,6 @@ private[yarn] class YarnAllocator(
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
-
}
private object YarnAllocator {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
new file mode 100644
index 0000000000..b7fe4ccc67
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
+
+ private val yarnAllocatorSuite = new YarnAllocatorSuite
+ import yarnAllocatorSuite._
+
+ override def beforeEach() {
+ yarnAllocatorSuite.beforeEach()
+ }
+
+ override def afterEach() {
+ yarnAllocatorSuite.afterEach()
+ }
+
+ test("allocate locality preferred containers with enough resource and no matched existed " +
+ "containers") {
+ // 1. All the locations of current containers cannot satisfy the new requirements
+ // 2. Current requested container number can fully satisfy the pending tasks.
+
+ val handler = createAllocator(2)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(
+ Array("host3", "host4", "host5"),
+ Array("host3", "host4", "host5"),
+ Array("host3", "host4")))
+ }
+
+ test("allocate locality preferred containers with enough resource and partially matched " +
+ "containers") {
+ // 1. Parts of current containers' locations can satisfy the new requirements
+ // 2. Current requested container number can fully satisfy the pending tasks.
+
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) ===
+ Array(null, Array("host2", "host3"), Array("host2", "host3")))
+ }
+
+ test("allocate locality preferred containers with limited resource and partially matched " +
+ "containers") {
+ // 1. Parts of current containers' locations can satisfy the new requirements
+ // 2. Current requested container number cannot fully satisfy the pending tasks.
+
+ val handler = createAllocator(3)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2")
+ ))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
+ }
+
+ test("allocate locality preferred containers with fully matched containers") {
+ // Current containers' locations can fully satisfy the new requirements
+
+ val handler = createAllocator(5)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(
+ createContainer("host1"),
+ createContainer("host1"),
+ createContainer("host2"),
+ createContainer("host2"),
+ createContainer("host3")
+ ))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(null, null, null))
+ }
+
+ test("allocate containers with no locality preference") {
+ // Request new container without locality preference
+
+ val handler = createAllocator(2)
+ handler.updateResourceRequests()
+ handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
+
+ val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+ 1, 0, Map.empty, handler.allocatedHostToContainersMap)
+
+ assert(localities.map(_.nodes) === Array(null))
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 7509000771..37a789fcd3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkFunSuite}
import org.apache.spark.SparkConf
@@ -32,8 +33,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.scheduler.SplitInfo
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-
class MockResolver extends DNSToSwitchMapping {
override def resolve(names: JList[String]): JList[String] = {
@@ -171,7 +170,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
- handler.requestTotalExecutors(3)
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (3)
@@ -182,7 +181,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
- handler.requestTotalExecutors(2)
+ handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (1)
}
@@ -193,7 +192,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
- handler.requestTotalExecutors(3)
+ handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (3)
@@ -203,7 +202,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (2)
- handler.requestTotalExecutors(1)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.updateResourceRequests()
handler.getNumPendingAllocate should be (0)
handler.getNumExecutorsRunning should be (2)
@@ -219,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
- handler.requestTotalExecutors(1)
+ handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
val statuses = Seq(container1, container2).map { c =>
@@ -241,5 +240,4 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
-
}