aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-07-09 13:26:46 -0700
committerAndrew Or <andrew@databricks.com>2015-07-09 13:26:46 -0700
commitc4830598b271cc6390d127bd4cf8ab02b28792e0 (patch)
tree70a5de3fbcb4ad0f277d03f411871f80995b534c /core
parentebdf58538058e57381c04b6725d4be0c37847ed3 (diff)
downloadspark-c4830598b271cc6390d127bd4cf8ab02b28792e0.tar.gz
spark-c4830598b271cc6390d127bd4cf8ab02b28792e0.tar.bz2
spark-c4830598b271cc6390d127bd4cf8ab02b28792e0.zip
[SPARK-6287] [MESOS] Add dynamic allocation to the coarse-grained Mesos scheduler
This is largely based on extracting the dynamic allocation parts from tnachen's #3861. Author: Iulian Dragos <jaguarul@gmail.com> Closes #4984 from dragos/issue/mesos-coarse-dynamicAllocation and squashes the following commits: 39df8cd [Iulian Dragos] Update tests to latest changes in core. 9d2c9fa [Iulian Dragos] Remove adjustment of executorLimitOption in doKillExecutors. 8b00f52 [Iulian Dragos] Latest round of reviews. 0cd00e0 [Iulian Dragos] Add persistent shuffle directory 15c45c1 [Iulian Dragos] Add dynamic allocation to the Spark coarse-grained scheduler.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala136
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala175
6 files changed, 331 insertions, 56 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d2547eeff2..82704b1ab2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -532,7 +532,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_executorAllocationManager =
if (dynamicAllocationEnabled) {
assert(supportDynamicAllocation,
- "Dynamic allocation of executors is currently only supported in YARN mode")
+ "Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
@@ -853,7 +853,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions).setName(path)
}
-
/**
* :: Experimental ::
*
@@ -1364,10 +1363,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Return whether dynamically adjusting the amount of resources allocated to
- * this application is supported. This is currently only available for YARN.
+ * this application is supported. This is currently only available for YARN
+ * and Mesos coarse-grained mode.
*/
- private[spark] def supportDynamicAllocation =
- master.contains("yarn") || _conf.getBoolean("spark.dynamicAllocation.testing", false)
+ private[spark] def supportDynamicAllocation: Boolean = {
+ (master.contains("yarn")
+ || master.contains("mesos")
+ || _conf.getBoolean("spark.dynamicAllocation.testing", false))
+ }
/**
* :: DeveloperApi ::
@@ -1385,7 +1388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
- "Requesting executors is currently only supported in YARN mode")
+ "Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
@@ -1403,7 +1406,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
- "Requesting executors is currently only supported in YARN mode")
+ "Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@@ -1421,7 +1424,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(supportDynamicAllocation,
- "Killing executors is currently only supported in YARN mode")
+ "Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index b68f8c7685..cbade13149 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,11 +18,14 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{List => JList}
+import java.util.{List => JList, Collections}
+import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
+import com.google.common.collect.HashBiMap
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
@@ -60,9 +63,27 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveIdsWithExecutors = new HashSet[String]
- val taskIdToSlaveId = new HashMap[Int, String]
- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+ val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
+ // How many times tasks on each slave failed
+ val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
+
+ /**
+ * The total number of executors we aim to have. Undefined when not using dynamic allocation
+ * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+ */
+ private var executorLimitOption: Option[Int] = None
+
+ /**
+ * Return the current executor limit, which may be [[Int.MaxValue]]
+ * before properly initialized.
+ */
+ private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
+
+ private val pendingRemovedSlaveIds = new HashSet[String]
+ // private lock object protecting mutable state above. Using the intrinsic lock
+ // may lead to deadlocks since the superclass might also try to lock
+ private val stateLock = new ReentrantLock
val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
@@ -86,7 +107,7 @@ private[spark] class CoarseMesosSchedulerBackend(
startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
}
- def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+ def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
@@ -120,10 +141,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = sc.env.rpcEnv.uriOf(
- SparkEnv.driverActorSystemName,
- RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val uri = conf.getOption("spark.executor.uri")
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
@@ -133,7 +150,7 @@ private[spark] class CoarseMesosSchedulerBackend(
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
- s" --driver-url $driverUrl" +
+ s" --driver-url $driverURL" +
s" --executor-id ${offer.getSlaveId.getValue}" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
@@ -142,11 +159,12 @@ private[spark] class CoarseMesosSchedulerBackend(
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
+ val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
- s" --driver-url $driverUrl" +
- s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --driver-url $driverURL" +
+ s" --executor-id $executorId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
@@ -155,6 +173,17 @@ private[spark] class CoarseMesosSchedulerBackend(
command.build()
}
+ protected def driverURL: String = {
+ if (conf.contains("spark.testing")) {
+ "driverURL"
+ } else {
+ sc.env.rpcEnv.uriOf(
+ SparkEnv.driverActorSystemName,
+ RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+ }
+ }
+
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
@@ -172,17 +201,18 @@ private[spark] class CoarseMesosSchedulerBackend(
* unless we've already launched more than we wanted to.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
+ stateLock.synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
- val slaveId = offer.getSlaveId.toString
+ val slaveId = offer.getSlaveId.getValue
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
- if (meetsConstraints &&
+ if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
+ meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
@@ -197,7 +227,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val task = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", calculateTotalMemory(sc)))
@@ -209,7 +239,9 @@ private[spark] class CoarseMesosSchedulerBackend(
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- d.launchTasks(List(offer.getId), List(task.build()), filters)
+ d.launchTasks(
+ Collections.singleton(offer.getId),
+ Collections.singleton(task.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
@@ -224,7 +256,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo("Mesos task " + taskId + " is now " + state)
- synchronized {
+ stateLock.synchronized {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
@@ -242,8 +274,9 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
+ executorTerminated(d, slaveId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
- mesosDriver.reviveOffers()
+ d.reviveOffers()
}
}
}
@@ -262,18 +295,39 @@ private[spark] class CoarseMesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
- // Note that the slave ID corresponds to the executor ID on that slave
- slaveIdsWithExecutors -= slaveId.getValue
- removeExecutor(slaveId.getValue, "Mesos slave lost")
+ /**
+ * Called when a slave is lost or a Mesos task finished. Update local view on
+ * what tasks are running and remove the terminated slave from the list of pending
+ * slave IDs that we might have asked to be killed. It also notifies the driver
+ * that an executor was removed.
+ */
+ private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
+ stateLock.synchronized {
+ if (slaveIdsWithExecutors.contains(slaveId)) {
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
+ if (slaveIdToTaskId.contains(slaveId)) {
+ val taskId: Int = slaveIdToTaskId.get(slaveId)
+ taskIdToSlaveId.remove(taskId)
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+ }
+ // TODO: This assumes one Spark executor per Mesos slave,
+ // which may no longer be true after SPARK-5095
+ pendingRemovedSlaveIds -= slaveId
+ slaveIdsWithExecutors -= slaveId
}
}
}
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+ private def sparkExecutorId(slaveId: String, taskId: String): String = {
+ s"$slaveId/$taskId"
+ }
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
+ }
+
+ override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
}
@@ -284,4 +338,34 @@ private[spark] class CoarseMesosSchedulerBackend(
super.applicationId
}
+ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ // We don't truly know if we can fulfill the full amount of executors
+ // since at coarse grain it depends on the amount of slaves available.
+ logInfo("Capping the total amount of executors to " + requestedTotal)
+ executorLimitOption = Some(requestedTotal)
+ true
+ }
+
+ override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ if (mesosDriver == null) {
+ logWarning("Asked to kill executors before the Mesos driver was started.")
+ return false
+ }
+
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
+ for (executorId <- executorIds) {
+ val slaveId = executorId.split("/")(0)
+ if (slaveIdToTaskId.contains(slaveId)) {
+ mesosDriver.killTask(
+ TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
+ pendingRemovedSlaveIds += slaveId
+ } else {
+ logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
+ }
+ }
+ // no need to adjust `executorLimitOption` since the AllocationManager already communicated
+ // the desired limit through a call to `doRequestTotalExecutors`.
+ // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+ true
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index d8a8c848bb..925702e63a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.util.control.NonFatal
import com.google.common.base.Splitter
-import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
+import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
import org.apache.mesos.Protos._
import org.apache.mesos.protobuf.GeneratedMessage
import org.apache.spark.{Logging, SparkContext}
@@ -39,7 +39,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
private final val registerLatch = new CountDownLatch(1)
// Driver for talking to Mesos
- protected var mesosDriver: MesosSchedulerDriver = null
+ protected var mesosDriver: SchedulerDriver = null
/**
* Starts the MesosSchedulerDriver with the provided information. This method returns
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 91ef86389a..5f537692a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -124,10 +124,16 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
(blockId, getFile(blockId))
}
+ /**
+ * Create local directories for storing block data. These directories are
+ * located inside configured local directories and won't
+ * be deleted on JVM exit when using the external shuffle service.
+ */
private def createLocalDirs(conf: SparkConf): Array[File] = {
- Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
+ Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
+ Utils.chmod700(localDir)
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 944560a913..b6b932104a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -733,7 +733,12 @@ private[spark] object Utils extends Logging {
localRootDirs
}
- private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
+ /**
+ * Return the configured local directories where Spark can write files. This
+ * method does not create any directories on its own, it only encapsulates the
+ * logic of locating the local directories according to deployment mode.
+ */
+ def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
@@ -749,27 +754,29 @@ private[spark] object Utils extends Logging {
Option(conf.getenv("SPARK_LOCAL_DIRS"))
.getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
.split(",")
- .flatMap { root =>
- try {
- val rootDir = new File(root)
- if (rootDir.exists || rootDir.mkdirs()) {
- val dir = createTempDir(root)
- chmod700(dir)
- Some(dir.getAbsolutePath)
- } else {
- logError(s"Failed to create dir in $root. Ignoring this directory.")
- None
- }
- } catch {
- case e: IOException =>
- logError(s"Failed to create local root dir in $root. Ignoring this directory.")
- None
- }
- }
- .toArray
}
}
+ private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
+ getConfiguredLocalDirs(conf).flatMap { root =>
+ try {
+ val rootDir = new File(root)
+ if (rootDir.exists || rootDir.mkdirs()) {
+ val dir = createTempDir(root)
+ chmod700(dir)
+ Some(dir.getAbsolutePath)
+ } else {
+ logError(s"Failed to create dir in $root. Ignoring this directory.")
+ None
+ }
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create local root dir in $root. Ignoring this directory.")
+ None
+ }
+ }.toArray
+ }
+
/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
new file mode 100644
index 0000000000..3f1692917a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util
+import java.util.Collections
+
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.Protos._
+import org.apache.mesos.SchedulerDriver
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.Matchers
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+
+class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
+ with LocalSparkContext
+ with MockitoSugar
+ with BeforeAndAfter {
+
+ private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(mem))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(cpu))
+ builder.setId(OfferID.newBuilder()
+ .setValue(offerId).build())
+ .setFrameworkId(FrameworkID.newBuilder()
+ .setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+ .setHostname(s"host${slaveId}")
+ .build()
+ }
+
+ private def createSchedulerBackend(
+ taskScheduler: TaskSchedulerImpl,
+ driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
+ val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
+ mesosDriver = driver
+ markRegistered()
+ }
+ backend.start()
+ backend
+ }
+
+ var sparkConf: SparkConf = _
+
+ before {
+ sparkConf = (new SparkConf)
+ .setMaster("local[*]")
+ .setAppName("test-mesos-dynamic-alloc")
+ .setSparkHome("/path")
+
+ sc = new SparkContext(sparkConf)
+ }
+
+ test("mesos supports killing and limiting executors") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+
+ sparkConf.set("spark.driver.host", "driverHost")
+ sparkConf.set("spark.driver.port", "1234")
+
+ val backend = createSchedulerBackend(taskScheduler, driver)
+ val minMem = backend.calculateTotalMemory(sc).toInt
+ val minCpu = 4
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(createOffer("o1", "s1", minMem, minCpu))
+
+ val taskID0 = TaskID.newBuilder().setValue("0").build()
+
+ backend.resourceOffers(driver, mesosOffers)
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ any[util.Collection[TaskInfo]],
+ any[Filters])
+
+ // simulate the allocation manager down-scaling executors
+ backend.doRequestTotalExecutors(0)
+ assert(backend.doKillExecutors(Seq("s1/0")))
+ verify(driver, times(1)).killTask(taskID0)
+
+ val mesosOffers2 = new java.util.ArrayList[Offer]
+ mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu))
+ backend.resourceOffers(driver, mesosOffers2)
+
+ verify(driver, times(1))
+ .declineOffer(OfferID.newBuilder().setValue("o2").build())
+
+ // Verify we didn't launch any new executor
+ assert(backend.slaveIdsWithExecutors.size === 1)
+
+ backend.doRequestTotalExecutors(2)
+ backend.resourceOffers(driver, mesosOffers2)
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)),
+ any[util.Collection[TaskInfo]],
+ any[Filters])
+
+ assert(backend.slaveIdsWithExecutors.size === 2)
+ backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build())
+ assert(backend.slaveIdsWithExecutors.size === 1)
+ }
+
+ test("mesos supports killing and relaunching tasks with executors") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+
+ val backend = createSchedulerBackend(taskScheduler, driver)
+ val minMem = backend.calculateTotalMemory(sc).toInt + 1024
+ val minCpu = 4
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ val offer1 = createOffer("o1", "s1", minMem, minCpu)
+ mesosOffers.add(offer1)
+
+ val offer2 = createOffer("o2", "s1", minMem, 1);
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer1.getId)),
+ anyObject(),
+ anyObject[Filters])
+
+ // Simulate task killed, executor no longer running
+ val status = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue("0").build())
+ .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
+ .setState(TaskState.TASK_KILLED)
+ .build
+
+ backend.statusUpdate(driver, status)
+ assert(!backend.slaveIdsWithExecutors.contains("s1"))
+
+ mesosOffers.clear()
+ mesosOffers.add(offer2)
+ backend.resourceOffers(driver, mesosOffers)
+ assert(backend.slaveIdsWithExecutors.contains("s1"))
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer2.getId)),
+ anyObject(),
+ anyObject[Filters])
+
+ verify(driver, times(1)).reviveOffers()
+ }
+}