aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala118
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala126
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala106
-rw-r--r--docs/running-on-mesos.md22
7 files changed, 358 insertions, 96 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cbade13149..b7fde0d9b3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,8 +18,8 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{List => JList, Collections}
import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
@@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend(
/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation
- * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+ * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
*/
private var executorLimitOption: Option[Int] = None
@@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend(
override def start() {
super.start()
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
+ val driver = createSchedulerDriver(
+ master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+ startScheduler(driver)
}
def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
@@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend(
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
- val task = MesosTaskInfo.newBuilder()
+ // Gather cpu resources from the available resources and use them in the task.
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+ val (_, memResourcesToUse) =
+ partitionResources(remainingResources, "mem", calculateTotalMemory(sc))
+ val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
- .addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", calculateTotalMemory(sc)))
+ .addAllResources(cpuResourcesToUse)
+ .addAllResources(memResourcesToUse)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
+ .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(
Collections.singleton(offer.getId),
- Collections.singleton(task.build()), filters)
+ Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
@@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
- logInfo("Mesos task " + taskId + " is now " + state)
+ logInfo(s"Mesos task $taskId is now $state")
stateLock.synchronized {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
@@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (TaskState.isFailed(TaskState.fromMesos(state))) {
failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+ logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
"is Spark installed on it?")
}
}
@@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
+ logError(s"Mesos error: $message")
scheduler.error(message)
}
@@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
- logInfo("Mesos slave lost: " + slaveId.getValue)
+ logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index d3a20f8221..f078547e71 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -295,20 +295,24 @@ private[spark] class MesosClusterScheduler(
def start(): Unit = {
// TODO: Implement leader election to make sure only one framework running in the cluster.
val fwId = schedulerState.fetch[String]("frameworkId")
- val builder = FrameworkInfo.newBuilder()
- .setUser(Utils.getCurrentUserName())
- .setName(appName)
- .setWebuiUrl(frameworkUrl)
- .setCheckpoint(true)
- .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
fwId.foreach { id =>
- builder.setId(FrameworkID.newBuilder().setValue(id).build())
frameworkId = id
}
recoverState()
metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
metricsSystem.start()
- startScheduler(master, MesosClusterScheduler.this, builder.build())
+ val driver = createSchedulerDriver(
+ master,
+ MesosClusterScheduler.this,
+ Utils.getCurrentUserName(),
+ appName,
+ conf,
+ Some(frameworkUrl),
+ Some(true),
+ Some(Integer.MAX_VALUE),
+ fwId)
+
+ startScheduler(driver)
ready = true
}
@@ -449,12 +453,8 @@ private[spark] class MesosClusterScheduler(
offer.cpu -= driverCpu
offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus").setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
- val memResource = Resource.newBuilder()
- .setName("mem").setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
+ val cpuResource = createResource("cpus", driverCpu)
+ val memResource = createResource("mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d72e2af456..3f63ec1c58 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -32,6 +32,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
+
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
@@ -45,8 +46,8 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with MesosSchedulerUtils {
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
+ // Stores the slave ids that has launched a Mesos executor.
+ val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks
@@ -66,12 +67,21 @@ private[spark] class MesosSchedulerBackend(
@volatile var appId: String = _
override def start() {
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
classLoader = Thread.currentThread.getContextClassLoader
- startScheduler(master, MesosSchedulerBackend.this, fwInfo)
+ val driver = createSchedulerDriver(
+ master, MesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+ startScheduler(driver)
}
- def createExecutorInfo(execId: String): MesosExecutorInfo = {
+ /**
+ * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+ * @param availableResources Available resources that is offered by Mesos
+ * @param execId The executor id to assign to this new executor.
+ * @return A tuple of the new mesos executor info and the remaining available resources.
+ */
+ def createExecutorInfo(
+ availableResources: JList[Resource],
+ execId: String): (MesosExecutorInfo, JList[Resource]) = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
@@ -115,32 +125,25 @@ private[spark] class MesosSchedulerBackend(
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
- val cpus = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder()
- .setValue(mesosExecutorCores).build())
- .build()
- val memory = Resource.newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(
- Value.Scalar.newBuilder()
- .setValue(calculateTotalMemory(sc)).build())
- .build()
- val executorInfo = MesosExecutorInfo.newBuilder()
+ val builder = MesosExecutorInfo.newBuilder()
+ val (resourcesAfterCpu, usedCpuResources) =
+ partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
+ val (resourcesAfterMem, usedMemResources) =
+ partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))
+
+ builder.addAllResources(usedCpuResources)
+ builder.addAllResources(usedMemResources)
+ val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
- .addResources(cpus)
- .addResources(memory)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}
- executorInfo.build()
+ (executorInfo.build(), resourcesAfterMem)
}
/**
@@ -183,6 +186,18 @@ private[spark] class MesosSchedulerBackend(
override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+ private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
+ val builder = new StringBuilder
+ tasks.foreach { t =>
+ builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
+ .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
+ .append("Task resources: ").append(t.getResourcesList).append("\n")
+ .append("Executor resources: ").append(t.getExecutor.getResourcesList)
+ .append("---------------------------------------------\n")
+ }
+ builder.toString()
+ }
+
/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
@@ -207,7 +222,7 @@ private[spark] class MesosSchedulerBackend(
val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
- (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+ (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
@@ -221,7 +236,7 @@ private[spark] class MesosSchedulerBackend(
unUsableOffers.foreach(o => d.declineOffer(o.getId))
val workerOffers = usableOffers.map { o =>
- val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
+ val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
@@ -236,6 +251,10 @@ private[spark] class MesosSchedulerBackend(
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
+ val slaveIdToResources = new HashMap[String, JList[Resource]]()
+ usableOffers.foreach { o =>
+ slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
+ }
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
@@ -245,15 +264,19 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
- offer.foreach { taskDesc =>
- val slaveId = taskDesc.executorId
- slaveIdsWithExecutors += slaveId
- slavesIdsOfAcceptedOffers += slaveId
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
- .add(createMesosTask(taskDesc, slaveId))
+ offer.foreach { taskDesc =>
+ val slaveId = taskDesc.executorId
+ slavesIdsOfAcceptedOffers += slaveId
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ val (mesosTask, remainingResources) = createMesosTask(
+ taskDesc,
+ slaveIdToResources(slaveId),
+ slaveId)
+ mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+ .add(mesosTask)
+ slaveIdToResources(slaveId) = remainingResources
+ }
}
- }
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
@@ -264,6 +287,7 @@ private[spark] class MesosSchedulerBackend(
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
+ logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
@@ -272,26 +296,32 @@ private[spark] class MesosSchedulerBackend(
for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
d.declineOffer(o.getId)
}
-
}
}
- /** Turn a Spark TaskDescription into a Mesos task */
- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+ /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
+ def createMesosTask(
+ task: TaskDescription,
+ resources: JList[Resource],
+ slaveId: String): (MesosTaskInfo, JList[Resource]) = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
- .build()
- MesosTaskInfo.newBuilder()
+ val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
+ (slaveIdToExecutorInfo(slaveId), resources)
+ } else {
+ createExecutorInfo(resources, slaveId)
+ }
+ slaveIdToExecutorInfo(slaveId) = executorInfo
+ val (finalResources, cpuResources) =
+ partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
+ val taskInfo = MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(createExecutorInfo(slaveId))
+ .setExecutor(executorInfo)
.setName(task.name)
- .addResources(cpuResource)
+ .addAllResources(cpuResources)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()
+ (taskInfo, finalResources)
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
@@ -337,7 +367,7 @@ private[spark] class MesosSchedulerBackend(
private def removeExecutor(slaveId: String, reason: String) = {
synchronized {
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
- slaveIdsWithExecutors -= slaveId
+ slaveIdToExecutorInfo -= slaveId
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 925702e63a..c04920e4f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -21,15 +21,17 @@ import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.GeneratedMessage
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext}
import org.apache.spark.util.Utils
+
/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
@@ -42,13 +44,63 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
protected var mesosDriver: SchedulerDriver = null
/**
- * Starts the MesosSchedulerDriver with the provided information. This method returns
- * only after the scheduler has registered with Mesos.
- * @param masterUrl Mesos master connection URL
- * @param scheduler Scheduler object
- * @param fwInfo FrameworkInfo to pass to the Mesos master
+ * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+ * @param masterUrl The url to connect to Mesos master
+ * @param scheduler the scheduler class to receive scheduler callbacks
+ * @param sparkUser User to impersonate with when running tasks
+ * @param appName The framework name to display on the Mesos UI
+ * @param conf Spark configuration
+ * @param webuiUrl The WebUI url to link from Mesos UI
+ * @param checkpoint Option to checkpoint tasks for failover
+ * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
+ * @param frameworkId The id of the new framework
*/
- def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = {
+ protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
+ val credBuilder = Credential.newBuilder()
+ webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
+ checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) }
+ failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) }
+ frameworkId.foreach { id =>
+ fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
+ }
+ conf.getOption("spark.mesos.principal").foreach { principal =>
+ fwInfoBuilder.setPrincipal(principal)
+ credBuilder.setPrincipal(principal)
+ }
+ conf.getOption("spark.mesos.secret").foreach { secret =>
+ credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+ }
+ if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
+ throw new SparkException(
+ "spark.mesos.principal must be configured when spark.mesos.secret is set")
+ }
+ conf.getOption("spark.mesos.role").foreach { role =>
+ fwInfoBuilder.setRole(role)
+ }
+ if (credBuilder.hasPrincipal) {
+ new MesosSchedulerDriver(
+ scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
+ } else {
+ new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
+ }
+ }
+
+ /**
+ * Starts the MesosSchedulerDriver and stores the current running driver to this new instance.
+ * This driver is expected to not be running.
+ * This method returns only after the scheduler has registered with Mesos.
+ */
+ def startScheduler(newDriver: SchedulerDriver): Unit = {
synchronized {
if (mesosDriver != null) {
registerLatch.await()
@@ -59,11 +111,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
setDaemon(true)
override def run() {
- mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
+ mesosDriver = newDriver
try {
val ret = mesosDriver.run()
logInfo("driver.run() returned with code " + ret)
- if (ret.equals(Status.DRIVER_ABORTED)) {
+ if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
System.exit(1)
}
} catch {
@@ -82,18 +134,62 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Signal that the scheduler has registered with Mesos.
*/
+ protected def getResource(res: JList[Resource], name: String): Double = {
+ // A resource can have multiple values in the offer since it can either be from
+ // a specific role or wildcard.
+ res.filter(_.getName == name).map(_.getScalar.getValue).sum
+ }
+
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
+ def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+ val builder = Resource.newBuilder()
+ .setName(name)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
+
+ role.foreach { r => builder.setRole(r) }
+
+ builder.build()
+ }
+
/**
- * Get the amount of resources for the specified type from the resource list
+ * Partition the existing set of resources into two groups, those remaining to be
+ * scheduled and those requested to be used for a new task.
+ * @param resources The full list of available resources
+ * @param resourceName The name of the resource to take from the available resources
+ * @param amountToUse The amount of resources to take from the available resources
+ * @return The remaining resources list and the used resources list.
*/
- protected def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
+ def partitionResources(
+ resources: JList[Resource],
+ resourceName: String,
+ amountToUse: Double): (List[Resource], List[Resource]) = {
+ var remain = amountToUse
+ var requestedResources = new ArrayBuffer[Resource]
+ val remainingResources = resources.map {
+ case r => {
+ if (remain > 0 &&
+ r.getType == Value.Type.SCALAR &&
+ r.getScalar.getValue > 0.0 &&
+ r.getName == resourceName) {
+ val usage = Math.min(remain, r.getScalar.getValue)
+ requestedResources += createResource(resourceName, usage, Some(r.getRole))
+ remain -= usage
+ createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+ } else {
+ r
+ }
+ }
}
- 0.0
+
+ // Filter any resource that has depleted.
+ val filteredResources =
+ remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)
+
+ (filteredResources.toList, requestedResources.toList)
}
/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 3f1692917a..4b504df7b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -22,7 +22,7 @@ import java.util.Collections
import org.apache.mesos.Protos.Value.Scalar
import org.apache.mesos.Protos._
-import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.Matchers
@@ -60,7 +60,16 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
- mesosDriver = driver
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = driver
markRegistered()
}
backend.start()
@@ -80,6 +89,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
test("mesos supports killing and limiting executors") {
val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
@@ -87,7 +97,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
sparkConf.set("spark.driver.port", "1234")
val backend = createSchedulerBackend(taskScheduler, driver)
- val minMem = backend.calculateTotalMemory(sc).toInt
+ val minMem = backend.calculateTotalMemory(sc)
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
@@ -130,11 +140,12 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
test("mesos supports killing and relaunching tasks with executors") {
val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val backend = createSchedulerBackend(taskScheduler, driver)
- val minMem = backend.calculateTotalMemory(sc).toInt + 1024
+ val minMem = backend.calculateTotalMemory(sc) + 1024
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index d01837fe78..5ed30f64d7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.Collections
+import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -60,14 +61,17 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val resources = List(
+ mesosSchedulerBackend.createResource("cpus", 4),
+ mesosSchedulerBackend.createResource("mem", 1024))
// uri is null.
- val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+ val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
assert(executorInfo.getCommand.getValue ===
s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
// uri exists.
conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
- val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+ val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
assert(executorInfo1.getCommand.getValue ===
s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
}
@@ -93,7 +97,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
- val execInfo = backend.createExecutorInfo("mockExecutor")
+ val (execInfo, _) = backend.createExecutorInfo(
+ List(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
@@ -194,7 +199,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
)
verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
- assert(capture.getValue.size() == 1)
+ assert(capture.getValue.size() === 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
val cpus = taskInfo.getResourcesList.get(0)
@@ -214,4 +219,97 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
backend.resourceOffers(driver, mesosOffers2)
verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
}
+
+ test("can handle multiple roles") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(new SparkConf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val id = 1
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setRole("prod")
+ .setScalar(Scalar.newBuilder().setValue(500))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("prod")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(1))
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(600))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(2))
+ val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(offer)
+
+ val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2 // Deducting 1 for executor
+ ))
+
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+ val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ assert(taskInfo.getResourcesCount === 1)
+ val cpusDev = taskInfo.getResourcesList.get(0)
+ assert(cpusDev.getName.equals("cpus"))
+ assert(cpusDev.getScalar.getValue.equals(1.0))
+ assert(cpusDev.getRole.equals("dev"))
+ val executorResources = taskInfo.getExecutor.getResourcesList
+ assert(executorResources.exists { r =>
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
+ })
+ assert(executorResources.exists { r =>
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
+ })
+ }
}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 1f915d8ea1..debdd2adf2 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -307,6 +307,28 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.principal</code></td>
+ <td>Framework principal to authenticate to Mesos</td>
+ <td>
+ Set the principal with which Spark framework will use to authenticate with Mesos.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.secret</code></td>
+ <td>Framework secret to authenticate to Mesos</td>
+ <td>
+ Set the secret with which Spark framework will use to authenticate with Mesos.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.role</code></td>
+ <td>Role for the Spark framework</td>
+ <td>
+ Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations
+ and resource weight sharing.
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.constraints</code></td>
<td>Attribute based constraints to be matched against when accepting resource offers.</td>
<td>