aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2016-02-10 10:53:33 -0800
committerAndrew Or <andrew@databricks.com>2016-02-10 10:53:33 -0800
commit80cb963ad963e26c3a7f8388bdd4ffd5e99aad1a (patch)
treeebd19083ee53f66618a81280c6b6667db129b179
parentc0b71e0b8f3c068f2f092bb118a16611b3d38d7a (diff)
downloadspark-80cb963ad963e26c3a7f8388bdd4ffd5e99aad1a.tar.gz
spark-80cb963ad963e26c3a7f8388bdd4ffd5e99aad1a.tar.bz2
spark-80cb963ad963e26c3a7f8388bdd4ffd5e99aad1a.zip
[SPARK-5095][MESOS] Support launching multiple mesos executors in coarse grained mesos mode.
This is the next iteration of tnachen's previous PR: https://github.com/apache/spark/pull/4027 In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone. This PR implements that resolution. This PR implements two high-level features. These two features are co-dependent, so they're implemented both here: - Mesos support for spark.executor.cores - Multiple executors per slave We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR. The contribution is my original work and I license the work to the project under the project's open source license. Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #10993 from mgummelt/executor_sizing.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala375
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala365
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala6
-rw-r--r--docs/configuration.md15
-rw-r--r--docs/running-on-mesos.md8
9 files changed, 521 insertions, 275 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f69a3d371e..0a5b09dc0d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -240,6 +240,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
+
+ logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
+ s"${executorData.executorHost}.")
+
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
@@ -309,7 +313,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// TODO (prashant) send conf instead of properties
- driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
+ driverEndpoint = createDriverEndpointRef(properties)
+ }
+
+ protected def createDriverEndpointRef(
+ properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
+ rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 0a2d72f4dc..98699e0b29 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -23,17 +23,17 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable
+import scala.collection.mutable.{Buffer, HashMap, HashSet}
import com.google.common.base.Stopwatch
-import com.google.common.collect.HashBiMap
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress}
+import org.apache.spark.rpc.{RpcEndpointAddress}
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -74,17 +74,13 @@ private[spark] class CoarseMesosSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
// Cores we have acquired with each Mesos task ID
- val coresByTaskId = new HashMap[Int, Int]
+ val coresByTaskId = new HashMap[String, Int]
var totalCoresAcquired = 0
- val slaveIdsWithExecutors = new HashSet[String]
-
- // Maping from slave Id to hostname
- private val slaveIdToHost = new HashMap[String, String]
-
- val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
- // How many times tasks on each slave failed
- val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
+ // SlaveID -> Slave
+ // This map accumulates entries for the duration of the job. Slaves are never deleted, because
+ // we need to maintain e.g. failure state and connection state.
+ private val slaves = new HashMap[String, Slave]
/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation.
@@ -105,13 +101,11 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
- private val pendingRemovedSlaveIds = new HashSet[String]
-
// private lock object protecting mutable state above. Using the intrinsic lock
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
- val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
+ val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
// Offer constraints
private val slaveOfferConstraints =
@@ -121,27 +115,31 @@ private[spark] class CoarseMesosSchedulerBackend(
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
- // A client for talking to the external shuffle service, if it is a
+ // A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
- Some(new MesosExternalShuffleClient(
- SparkTransportConf.fromSparkConf(conf, "shuffle"),
- securityManager,
- securityManager.isAuthenticationEnabled(),
- securityManager.isSaslEncryptionEnabled()))
+ Some(getShuffleClient())
} else {
None
}
}
+ protected def getShuffleClient(): MesosExternalShuffleClient = {
+ new MesosExternalShuffleClient(
+ SparkTransportConf.fromSparkConf(conf, "shuffle"),
+ securityManager,
+ securityManager.isAuthenticationEnabled(),
+ securityManager.isSaslEncryptionEnabled())
+ }
+
var nextMesosTaskId = 0
@volatile var appId: String = _
- def newMesosTaskId(): Int = {
+ def newMesosTaskId(): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
- id
+ id.toString
}
override def start() {
@@ -156,7 +154,7 @@ private[spark] class CoarseMesosSchedulerBackend(
startScheduler(driver)
}
- def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
+ def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
@@ -200,7 +198,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
- s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
@@ -208,12 +206,11 @@ private[spark] class CoarseMesosSchedulerBackend(
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
- val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)
command.setValue(
s"cd $basename*; $prefixEnv " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
- s" --executor-id $executorId" +
+ s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
@@ -268,113 +265,209 @@ private[spark] class CoarseMesosSchedulerBackend(
offers.asScala.map(_.getId).foreach(d.declineOffer)
return
}
- val filters = Filters.newBuilder().setRefuseSeconds(5).build()
- for (offer <- offers.asScala) {
+
+ logDebug(s"Received ${offers.size} resource offers.")
+
+ val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
val offerAttributes = toAttributeMap(offer.getAttributesList)
- val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ }
+
+ declineUnmatchedOffers(d, unmatchedOffers)
+ handleMatchedOffers(d, matchedOffers)
+ }
+ }
+
+ private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
+ for (offer <- offers) {
+ val id = offer.getId.getValue
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus")
+ val filters = Filters.newBuilder()
+ .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ + s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+ d.declineOffer(offer.getId, filters)
+ }
+ }
+
+ /**
+ * Launches executors on accepted offers, and declines unused offers. Executors are launched
+ * round-robin on offers.
+ *
+ * @param d SchedulerDriver
+ * @param offers Mesos offers that match attribute constraints
+ */
+ private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
+ val tasks = buildMesosTasks(offers)
+ for (offer <- offers) {
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val offerMem = getResource(offer.getResourcesList, "mem")
+ val offerCpus = getResource(offer.getResourcesList, "cpus")
+ val id = offer.getId.getValue
+
+ if (tasks.contains(offer.getId)) { // accept
+ val offerTasks = tasks(offer.getId)
+
+ logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
+ s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.")
+
+ for (task <- offerTasks) {
+ val taskId = task.getTaskId
+ val mem = getResource(task.getResourcesList, "mem")
+ val cpus = getResource(task.getResourcesList, "cpus")
+
+ logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.")
+ }
+
+ d.launchTasks(
+ Collections.singleton(offer.getId),
+ offerTasks.asJava)
+ } else { // decline
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
+ s"mem: $offerMem cpu: $offerCpus")
+
+ d.declineOffer(offer.getId)
+ }
+ }
+ }
+
+ /**
+ * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize
+ * per-task memory and IO, tasks are round-robin assigned to offers.
+ *
+ * @param offers Mesos offers that match attribute constraints
+ * @return A map from OfferID to a list of Mesos tasks to launch on that offer
+ */
+ private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
+ // offerID -> tasks
+ val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
+
+ // offerID -> resources
+ val remainingResources = mutable.Map(offers.map(offer =>
+ (offer.getId.getValue, offer.getResourcesList)): _*)
+
+ var launchTasks = true
+
+ // TODO(mgummelt): combine offers for a single slave
+ //
+ // round-robin create executors on the available offers
+ while (launchTasks) {
+ launchTasks = false
+
+ for (offer <- offers) {
val slaveId = offer.getSlaveId.getValue
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus").toInt
- val id = offer.getId.getValue
- if (meetsConstraints) {
- if (taskIdToSlaveId.size < executorLimit &&
- totalCoresAcquired < maxCores &&
- mem >= calculateTotalMemory(sc) &&
- cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- totalCoresAcquired += cpusToUse
- val taskId = newMesosTaskId()
- taskIdToSlaveId.put(taskId, slaveId)
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- // Gather cpu resources from the available resources and use them in the task.
- val (remainingResources, cpuResourcesToUse) =
- partitionResources(offer.getResourcesList, "cpus", cpusToUse)
- val (_, memResourcesToUse) =
- partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
- val taskBuilder = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
- .setName("Task " + taskId)
- .addAllResources(cpuResourcesToUse.asJava)
- .addAllResources(memResourcesToUse.asJava)
-
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
- }
-
- // Accept the offer and launch the task
- logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
- d.launchTasks(
- Collections.singleton(offer.getId),
- Collections.singleton(taskBuilder.build()), filters)
- } else {
- // Decline the offer
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- d.declineOffer(offer.getId)
+ val offerId = offer.getId.getValue
+ val resources = remainingResources(offerId)
+
+ if (canLaunchTask(slaveId, resources)) {
+ // Create a task
+ launchTasks = true
+ val taskId = newMesosTaskId()
+ val offerCPUs = getResource(resources, "cpus").toInt
+
+ val taskCPUs = executorCores(offerCPUs)
+ val taskMemory = executorMemory(sc)
+
+ slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
+
+ val (afterCPUResources, cpuResourcesToUse) =
+ partitionResources(resources, "cpus", taskCPUs)
+ val (resourcesLeft, memResourcesToUse) =
+ partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+
+ val taskBuilder = MesosTaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+ .setSlaveId(offer.getSlaveId)
+ .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
+ .setName("Task " + taskId)
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
}
- } else {
- // This offer does not meet constraints. We don't need to see it again.
- // Decline the offer for a long period of time.
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
- + s" for $rejectOfferDurationForUnmetConstraints seconds")
- d.declineOffer(offer.getId, Filters.newBuilder()
- .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+
+ tasks(offer.getId) ::= taskBuilder.build()
+ remainingResources(offerId) = resourcesLeft.asJava
+ totalCoresAcquired += taskCPUs
+ coresByTaskId(taskId) = taskCPUs
}
}
}
+ tasks.toMap
+ }
+
+ private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
+ val offerMem = getResource(resources, "mem")
+ val offerCPUs = getResource(resources, "cpus").toInt
+ val cpus = executorCores(offerCPUs)
+ val mem = executorMemory(sc)
+
+ cpus > 0 &&
+ cpus <= offerCPUs &&
+ cpus + totalCoresAcquired <= maxCores &&
+ mem <= offerMem &&
+ numExecutors() < executorLimit &&
+ slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
}
+ private def executorCores(offerCPUs: Int): Int = {
+ sc.conf.getInt("spark.executor.cores",
+ math.min(offerCPUs, maxCores - totalCoresAcquired))
+ }
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val taskId = status.getTaskId.getValue.toInt
- val state = status.getState
- logInfo(s"Mesos task $taskId is now $state")
- val slaveId: String = status.getSlaveId.getValue
+ val taskId = status.getTaskId.getValue
+ val slaveId = status.getSlaveId.getValue
+ val state = TaskState.fromMesos(status.getState)
+
+ logInfo(s"Mesos task $taskId is now ${status.getState}")
+
stateLock.synchronized {
+ val slave = slaves(slaveId)
+
// If the shuffle service is enabled, have the driver register with each one of the
// shuffle services. This allows the shuffle services to clean up state associated with
// this application when the driver exits. There is currently not a great way to detect
// this through Mesos, since the shuffle services are set up independently.
- if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
- slaveIdToHost.contains(slaveId) &&
- shuffleServiceEnabled) {
+ if (state.equals(TaskState.RUNNING) &&
+ shuffleServiceEnabled &&
+ !slave.shuffleRegistered) {
assume(mesosExternalShuffleClient.isDefined,
"External shuffle client was not instantiated even though shuffle service is enabled.")
// TODO: Remove this and allow the MesosExternalShuffleService to detect
// framework termination when new Mesos Framework HTTP API is available.
val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
- val hostname = slaveIdToHost.remove(slaveId).get
+
logDebug(s"Connecting to shuffle service on slave $slaveId, " +
- s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
+ s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
+
mesosExternalShuffleClient.get
- .registerDriverWithShuffleService(hostname, externalShufflePort)
+ .registerDriverWithShuffleService(slave.hostname, externalShufflePort)
+ slave.shuffleRegistered = true
}
- if (TaskState.isFinished(TaskState.fromMesos(state))) {
- val slaveId = taskIdToSlaveId.get(taskId)
- slaveIdsWithExecutors -= slaveId
- taskIdToSlaveId.remove(taskId)
+ if (TaskState.isFinished(state)) {
// Remove the cores we have remembered for this task, if it's in the hashmap
for (cores <- coresByTaskId.get(taskId)) {
totalCoresAcquired -= cores
coresByTaskId -= taskId
}
// If it was a failure, mark the slave as failed for blacklisting purposes
- if (TaskState.isFailed(TaskState.fromMesos(state))) {
- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+ if (TaskState.isFailed(state)) {
+ slave.taskFailures += 1
+
+ if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
"is Spark installed on it?")
}
}
- executorTerminated(d, slaveId, s"Executor finished with state $state")
+ executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
d.reviveOffers()
}
@@ -396,20 +489,24 @@ private[spark] class CoarseMesosSchedulerBackend(
stopCalled = true
super.stop()
}
+
// Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
// See SPARK-12330
val stopwatch = new Stopwatch()
stopwatch.start()
+
// slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
- while (slaveIdsWithExecutors.nonEmpty &&
+ while (numExecutors() > 0 &&
stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) {
Thread.sleep(100)
}
- if (slaveIdsWithExecutors.nonEmpty) {
- logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} remaining executors "
+
+ if (numExecutors() > 0) {
+ logWarning(s"Timed out waiting for ${numExecutors()} remaining executors "
+ s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
+ "on the mesos nodes.")
}
+
if (mesosDriver != null) {
mesosDriver.stop()
}
@@ -418,40 +515,25 @@ private[spark] class CoarseMesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
/**
- * Called when a slave is lost or a Mesos task finished. Update local view on
- * what tasks are running and remove the terminated slave from the list of pending
- * slave IDs that we might have asked to be killed. It also notifies the driver
- * that an executor was removed.
+ * Called when a slave is lost or a Mesos task finished. Updates local view on
+ * what tasks are running. It also notifies the driver that an executor was removed.
*/
- private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
+ private def executorTerminated(d: SchedulerDriver,
+ slaveId: String,
+ taskId: String,
+ reason: String): Unit = {
stateLock.synchronized {
- if (slaveIdsWithExecutors.contains(slaveId)) {
- val slaveIdToTaskId = taskIdToSlaveId.inverse()
- if (slaveIdToTaskId.containsKey(slaveId)) {
- val taskId: Int = slaveIdToTaskId.get(slaveId)
- taskIdToSlaveId.remove(taskId)
- removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
- }
- // TODO: This assumes one Spark executor per Mesos slave,
- // which may no longer be true after SPARK-5095
- pendingRemovedSlaveIds -= slaveId
- slaveIdsWithExecutors -= slaveId
- }
+ removeExecutor(taskId, SlaveLost(reason))
+ slaves(slaveId).taskIDs.remove(taskId)
}
}
- private def sparkExecutorId(slaveId: String, taskId: String): String = {
- s"$slaveId/$taskId"
- }
-
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
- executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}
override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
+ logInfo("Mesos executor lost: %s".format(e.getValue))
}
override def applicationId(): String =
@@ -471,23 +553,26 @@ private[spark] class CoarseMesosSchedulerBackend(
override def doKillExecutors(executorIds: Seq[String]): Boolean = {
if (mesosDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
- return false
- }
-
- val slaveIdToTaskId = taskIdToSlaveId.inverse()
- for (executorId <- executorIds) {
- val slaveId = executorId.split("/")(0)
- if (slaveIdToTaskId.containsKey(slaveId)) {
- mesosDriver.killTask(
- TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
- pendingRemovedSlaveIds += slaveId
- } else {
- logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
+ false
+ } else {
+ for (executorId <- executorIds) {
+ val taskId = TaskID.newBuilder().setValue(executorId).build()
+ mesosDriver.killTask(taskId)
}
+ // no need to adjust `executorLimitOption` since the AllocationManager already communicated
+ // the desired limit through a call to `doRequestTotalExecutors`.
+ // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+ true
}
- // no need to adjust `executorLimitOption` since the AllocationManager already communicated
- // the desired limit through a call to `doRequestTotalExecutors`.
- // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
- true
}
+
+ private def numExecutors(): Int = {
+ slaves.values.map(_.taskIDs.size).sum
+ }
+}
+
+private class Slave(val hostname: String) {
+ val taskIDs = new HashSet[String]()
+ var taskFailures = 0
+ var shuffleRegistered = false
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 340f29bac9..8929d8a427 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -138,7 +138,7 @@ private[spark] class MesosSchedulerBackend(
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", mesosExecutorCores)
val (resourcesAfterMem, usedMemResources) =
- partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
+ partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
builder.addAllResources(usedCpuResources.asJava)
builder.addAllResources(usedMemResources.asJava)
@@ -250,7 +250,7 @@ private[spark] class MesosSchedulerBackend(
// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
- val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+ val meetsMemoryRequirements = mem >= executorMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index f9f5da9bc8..a98f2f1fe5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -140,15 +140,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}
- /**
- * Signal that the scheduler has registered with Mesos.
- */
- protected def getResource(res: JList[Resource], name: String): Double = {
+ def getResource(res: JList[Resource], name: String): Double = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}
+ /**
+ * Signal that the scheduler has registered with Mesos.
+ */
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
@@ -337,7 +337,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
*/
- def calculateTotalMemory(sc: SparkContext): Int = {
+ def executorMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index a4110d2d46..e542aa0cfc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -17,19 +17,23 @@
package org.apache.spark.scheduler.cluster.mesos
-import java.util
import java.util.Collections
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.Matchers
+import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
+import org.apache.spark.rpc.{RpcEndpointRef}
import org.apache.spark.scheduler.TaskSchedulerImpl
class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
@@ -37,6 +41,223 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
with MockitoSugar
with BeforeAndAfter {
+ var sparkConf: SparkConf = _
+ var driver: SchedulerDriver = _
+ var taskScheduler: TaskSchedulerImpl = _
+ var backend: CoarseMesosSchedulerBackend = _
+ var externalShuffleClient: MesosExternalShuffleClient = _
+ var driverEndpoint: RpcEndpointRef = _
+
+ test("mesos supports killing and limiting executors") {
+ setBackend()
+ sparkConf.set("spark.driver.host", "driverHost")
+ sparkConf.set("spark.driver.port", "1234")
+
+ val minMem = backend.executorMemory(sc)
+ val minCpu = 4
+ val offers = List((minMem, minCpu))
+
+ // launches a task on a valid offer
+ offerResources(offers)
+ verifyTaskLaunched("o1")
+
+ // kills executors
+ backend.doRequestTotalExecutors(0)
+ assert(backend.doKillExecutors(Seq("0")))
+ val taskID0 = createTaskId("0")
+ verify(driver, times(1)).killTask(taskID0)
+
+ // doesn't launch a new task when requested executors == 0
+ offerResources(offers, 2)
+ verifyDeclinedOffer(driver, createOfferId("o2"))
+
+ // Launches a new task when requested executors is positive
+ backend.doRequestTotalExecutors(2)
+ offerResources(offers, 2)
+ verifyTaskLaunched("o2")
+ }
+
+ test("mesos supports killing and relaunching tasks with executors") {
+ setBackend()
+
+ // launches a task on a valid offer
+ val minMem = backend.executorMemory(sc) + 1024
+ val minCpu = 4
+ val offer1 = (minMem, minCpu)
+ val offer2 = (minMem, 1)
+ offerResources(List(offer1, offer2))
+ verifyTaskLaunched("o1")
+
+ // accounts for a killed task
+ val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
+ backend.statusUpdate(driver, status)
+ verify(driver, times(1)).reviveOffers()
+
+ // Launches a new task on a valid offer from the same slave
+ offerResources(List(offer2))
+ verifyTaskLaunched("o2")
+ }
+
+ test("mesos supports spark.executor.cores") {
+ val executorCores = 4
+ setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ val offers = List((executorMemory * 2, executorCores + 1))
+ offerResources(offers)
+
+ val taskInfos = verifyTaskLaunched("o1")
+ assert(taskInfos.size() == 1)
+
+ val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ assert(cpus == executorCores)
+ }
+
+ test("mesos supports unset spark.executor.cores") {
+ setBackend()
+
+ val executorMemory = backend.executorMemory(sc)
+ val offerCores = 10
+ offerResources(List((executorMemory * 2, offerCores)))
+
+ val taskInfos = verifyTaskLaunched("o1")
+ assert(taskInfos.size() == 1)
+
+ val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ assert(cpus == offerCores)
+ }
+
+ test("mesos does not acquire more than spark.cores.max") {
+ val maxCores = 10
+ setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List((executorMemory, maxCores + 1)))
+
+ val taskInfos = verifyTaskLaunched("o1")
+ assert(taskInfos.size() == 1)
+
+ val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ assert(cpus == maxCores)
+ }
+
+ test("mesos declines offers that violate attribute constraints") {
+ setBackend(Map("spark.mesos.constraints" -> "x:true"))
+ offerResources(List((backend.executorMemory(sc), 4)))
+ verifyDeclinedOffer(driver, createOfferId("o1"), true)
+ }
+
+ test("mesos assigns tasks round-robin on offers") {
+ val executorCores = 4
+ val maxCores = executorCores * 2
+ setBackend(Map("spark.executor.cores" -> executorCores.toString,
+ "spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(
+ (executorMemory * 2, executorCores * 2),
+ (executorMemory * 2, executorCores * 2)))
+
+ verifyTaskLaunched("o1")
+ verifyTaskLaunched("o2")
+ }
+
+ test("mesos creates multiple executors on a single slave") {
+ val executorCores = 4
+ setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+ // offer with room for two executors
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List((executorMemory * 2, executorCores * 2)))
+
+ // verify two executors were started on a single offer
+ val taskInfos = verifyTaskLaunched("o1")
+ assert(taskInfos.size() == 2)
+ }
+
+ test("mesos doesn't register twice with the same shuffle service") {
+ setBackend(Map("spark.shuffle.service.enabled" -> "true"))
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verifyTaskLaunched("o1")
+
+ val offer2 = createOffer("o2", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer2).asJava)
+ verifyTaskLaunched("o2")
+
+ val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
+ backend.statusUpdate(driver, status1)
+
+ val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
+ backend.statusUpdate(driver, status2)
+ verify(externalShuffleClient, times(1)).registerDriverWithShuffleService(anyString, anyInt)
+ }
+
+ test("mesos kills an executor when told") {
+ setBackend()
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verifyTaskLaunched("o1")
+
+ backend.doKillExecutors(List("0"))
+ verify(driver, times(1)).killTask(createTaskId("0"))
+ }
+
+ private def verifyDeclinedOffer(driver: SchedulerDriver,
+ offerId: OfferID,
+ filter: Boolean = false): Unit = {
+ if (filter) {
+ verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters])
+ } else {
+ verify(driver, times(1)).declineOffer(Matchers.eq(offerId))
+ }
+ }
+
+ private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
+ val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
+ createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
+
+ backend.resourceOffers(driver, mesosOffers.asJava)
+ }
+
+ private def verifyTaskLaunched(offerId: String): java.util.Collection[TaskInfo] = {
+ val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(createOfferId(offerId))),
+ captor.capture())
+ captor.getValue
+ }
+
+ private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
+ TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+ .setState(state)
+ .build
+ }
+
+
+ private def createOfferId(offerId: String): OfferID = {
+ OfferID.newBuilder().setValue(offerId).build()
+ }
+
+ private def createSlaveId(slaveId: String): SlaveID = {
+ SlaveID.newBuilder().setValue(slaveId).build()
+ }
+
+ private def createExecutorId(executorId: String): ExecutorID = {
+ ExecutorID.newBuilder().setValue(executorId).build()
+ }
+
+ private def createTaskId(taskId: String): TaskID = {
+ TaskID.newBuilder().setValue(taskId).build()
+ }
+
private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
@@ -47,8 +268,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(OfferID.newBuilder()
- .setValue(offerId).build())
+ builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(slaveId))
@@ -58,130 +278,55 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
- driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
+ driver: SchedulerDriver,
+ shuffleClient: MesosExternalShuffleClient,
+ endpoint: RpcEndpointRef): CoarseMesosSchedulerBackend = {
val securityManager = mock[SecurityManager]
+
val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
- masterUrl: String,
- scheduler: Scheduler,
- sparkUser: String,
- appName: String,
- conf: SparkConf,
- webuiUrl: Option[String] = None,
- checkpoint: Option[Boolean] = None,
- failoverTimeout: Option[Double] = None,
- frameworkId: Option[String] = None): SchedulerDriver = driver
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = driver
+
+ override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
+
+ override protected def createDriverEndpointRef(
+ properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
+
markRegistered()
}
backend.start()
backend
}
- var sparkConf: SparkConf = _
-
- before {
+ private def setBackend(sparkConfVars: Map[String, String] = null) {
sparkConf = (new SparkConf)
.setMaster("local[*]")
.setAppName("test-mesos-dynamic-alloc")
.setSparkHome("/path")
- sc = new SparkContext(sparkConf)
- }
-
- test("mesos supports killing and limiting executors") {
- val driver = mock[SchedulerDriver]
- when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
- val taskScheduler = mock[TaskSchedulerImpl]
- when(taskScheduler.sc).thenReturn(sc)
-
- sparkConf.set("spark.driver.host", "driverHost")
- sparkConf.set("spark.driver.port", "1234")
-
- val backend = createSchedulerBackend(taskScheduler, driver)
- val minMem = backend.calculateTotalMemory(sc)
- val minCpu = 4
-
- val mesosOffers = new java.util.ArrayList[Offer]
- mesosOffers.add(createOffer("o1", "s1", minMem, minCpu))
-
- val taskID0 = TaskID.newBuilder().setValue("0").build()
-
- backend.resourceOffers(driver, mesosOffers)
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- any[util.Collection[TaskInfo]],
- any[Filters])
-
- // simulate the allocation manager down-scaling executors
- backend.doRequestTotalExecutors(0)
- assert(backend.doKillExecutors(Seq("s1/0")))
- verify(driver, times(1)).killTask(taskID0)
-
- val mesosOffers2 = new java.util.ArrayList[Offer]
- mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu))
- backend.resourceOffers(driver, mesosOffers2)
-
- verify(driver, times(1))
- .declineOffer(OfferID.newBuilder().setValue("o2").build())
-
- // Verify we didn't launch any new executor
- assert(backend.slaveIdsWithExecutors.size === 1)
-
- backend.doRequestTotalExecutors(2)
- backend.resourceOffers(driver, mesosOffers2)
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)),
- any[util.Collection[TaskInfo]],
- any[Filters])
+ if (sparkConfVars != null) {
+ for (attr <- sparkConfVars) {
+ sparkConf.set(attr._1, attr._2)
+ }
+ }
- assert(backend.slaveIdsWithExecutors.size === 2)
- backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build())
- assert(backend.slaveIdsWithExecutors.size === 1)
- }
+ sc = new SparkContext(sparkConf)
- test("mesos supports killing and relaunching tasks with executors") {
- val driver = mock[SchedulerDriver]
+ driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
- val taskScheduler = mock[TaskSchedulerImpl]
+ taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
+ externalShuffleClient = mock[MesosExternalShuffleClient]
+ driverEndpoint = mock[RpcEndpointRef]
- val backend = createSchedulerBackend(taskScheduler, driver)
- val minMem = backend.calculateTotalMemory(sc) + 1024
- val minCpu = 4
-
- val mesosOffers = new java.util.ArrayList[Offer]
- val offer1 = createOffer("o1", "s1", minMem, minCpu)
- mesosOffers.add(offer1)
-
- val offer2 = createOffer("o2", "s1", minMem, 1);
-
- backend.resourceOffers(driver, mesosOffers)
-
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(offer1.getId)),
- anyObject(),
- anyObject[Filters])
-
- // Simulate task killed, executor no longer running
- val status = TaskStatus.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue("0").build())
- .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
- .setState(TaskState.TASK_KILLED)
- .build
-
- backend.statusUpdate(driver, status)
- assert(!backend.slaveIdsWithExecutors.contains("s1"))
-
- mesosOffers.clear()
- mesosOffers.add(offer2)
- backend.resourceOffers(driver, mesosOffers)
- assert(backend.slaveIdsWithExecutors.contains("s1"))
-
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(offer2.getId)),
- anyObject(),
- anyObject[Filters])
-
- verify(driver, times(1)).reviveOffers()
+ backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index e111e2e9f6..3fb3279073 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -189,7 +189,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
- val minMem = backend.calculateTotalMemory(sc)
+ val minMem = backend.executorMemory(sc)
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index 2eb43b7313..85437b2f80 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -41,20 +41,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("use at-least minimum overhead") {
val f = fixture
when(f.sc.executorMemory).thenReturn(512)
- utils.calculateTotalMemory(f.sc) shouldBe 896
+ utils.executorMemory(f.sc) shouldBe 896
}
test("use overhead if it is greater than minimum value") {
val f = fixture
when(f.sc.executorMemory).thenReturn(4096)
- utils.calculateTotalMemory(f.sc) shouldBe 4505
+ utils.executorMemory(f.sc) shouldBe 4505
}
test("use spark.mesos.executor.memoryOverhead (if set)") {
val f = fixture
when(f.sc.executorMemory).thenReturn(1024)
f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
- utils.calculateTotalMemory(f.sc) shouldBe 1536
+ utils.executorMemory(f.sc) shouldBe 1536
}
test("parse a non-empty constraint string correctly") {
diff --git a/docs/configuration.md b/docs/configuration.md
index cd9dc1bcfc..b07c69cd4c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -825,13 +825,18 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.executor.cores</code></td>
- <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
<td>
- The number of cores to use on each executor. For YARN and standalone mode only.
+ 1 in YARN mode, all the available cores on the worker in
+ standalone and Mesos coarse-grained modes.
+ </td>
+ <td>
+ The number of cores to use on each executor.
- In standalone mode, setting this parameter allows an application to run multiple executors on
- the same worker, provided that there are enough cores on that worker. Otherwise, only one
- executor per application will run on each worker.
+ In standalone and Mesos coarse-grained modes, setting this
+ parameter allows an application to run multiple executors on the
+ same worker, provided that there are enough cores on that
+ worker. Otherwise, only one executor per application will run on
+ each worker.
</td>
</tr>
<tr>
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index e1c87a8d95..0df476d9b4 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -277,9 +277,11 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.extra.cores</code></td>
<td><code>0</code></td>
<td>
- Set the extra amount of cpus to request per task. This setting is only used for Mesos coarse grain mode.
- The total amount of cores requested per task is the number of cores in the offer plus the extra cores configured.
- Note that total amount of cores the executor will request in total will not exceed the <code>spark.cores.max</code> setting.
+ Set the extra number of cores for an executor to advertise. This
+ does not result in more cores allocated. It instead means that an
+ executor will "pretend" it has more cores, so that the driver will
+ send it more tasks. Use this to increase parallelism. This
+ setting is only used for Mesos coarse-grained mode.
</td>
</tr>
<tr>