aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-07-16 19:36:45 -0700
committerAndrew Or <andrew@databricks.com>2015-07-16 19:37:15 -0700
commitd86bbb4e286f16f77ba125452b07827684eafeed (patch)
tree9b7cadb3740105d6eb6375d239e86bd2fcd08218
parent49351c7f597c67950cc65e5014a89fad31b9a6f7 (diff)
downloadspark-d86bbb4e286f16f77ba125452b07827684eafeed.tar.gz
spark-d86bbb4e286f16f77ba125452b07827684eafeed.tar.bz2
spark-d86bbb4e286f16f77ba125452b07827684eafeed.zip
[SPARK-6284] [MESOS] Add mesos role, principal and secret
Mesos supports framework authentication and role to be set per framework, which the role is used to identify the framework's role which impacts the sharing weight of resource allocation and optional authentication information to allow the framework to be connected to the master. Author: Timothy Chen <tnachen@gmail.com> Closes #4960 from tnachen/mesos_fw_auth and squashes the following commits: 0f9f03e [Timothy Chen] Fix review comments. 8f9488a [Timothy Chen] Fix rebase f7fc2a9 [Timothy Chen] Add mesos role, auth and secret.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala118
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala126
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala106
-rw-r--r--docs/running-on-mesos.md22
7 files changed, 358 insertions, 96 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cbade13149..b7fde0d9b3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,8 +18,8 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{List => JList, Collections}
import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
@@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend(
/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation
- * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+ * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
*/
private var executorLimitOption: Option[Int] = None
@@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend(
override def start() {
super.start()
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
+ val driver = createSchedulerDriver(
+ master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+ startScheduler(driver)
}
def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
@@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend(
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
- val task = MesosTaskInfo.newBuilder()
+ // Gather cpu resources from the available resources and use them in the task.
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+ val (_, memResourcesToUse) =
+ partitionResources(remainingResources, "mem", calculateTotalMemory(sc))
+ val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
- .addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", calculateTotalMemory(sc)))
+ .addAllResources(cpuResourcesToUse)
+ .addAllResources(memResourcesToUse)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
+ .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(
Collections.singleton(offer.getId),
- Collections.singleton(task.build()), filters)
+ Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
@@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
- logInfo("Mesos task " + taskId + " is now " + state)
+ logInfo(s"Mesos task $taskId is now $state")
stateLock.synchronized {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
@@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (TaskState.isFailed(TaskState.fromMesos(state))) {
failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+ logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
"is Spark installed on it?")
}
}
@@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
+ logError(s"Mesos error: $message")
scheduler.error(message)
}
@@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
- logInfo("Mesos slave lost: " + slaveId.getValue)
+ logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index d3a20f8221..f078547e71 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -295,20 +295,24 @@ private[spark] class MesosClusterScheduler(
def start(): Unit = {
// TODO: Implement leader election to make sure only one framework running in the cluster.
val fwId = schedulerState.fetch[String]("frameworkId")
- val builder = FrameworkInfo.newBuilder()
- .setUser(Utils.getCurrentUserName())
- .setName(appName)
- .setWebuiUrl(frameworkUrl)
- .setCheckpoint(true)
- .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
fwId.foreach { id =>
- builder.setId(FrameworkID.newBuilder().setValue(id).build())
frameworkId = id
}
recoverState()
metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
metricsSystem.start()
- startScheduler(master, MesosClusterScheduler.this, builder.build())
+ val driver = createSchedulerDriver(
+ master,
+ MesosClusterScheduler.this,
+ Utils.getCurrentUserName(),
+ appName,
+ conf,
+ Some(frameworkUrl),
+ Some(true),
+ Some(Integer.MAX_VALUE),
+ fwId)
+
+ startScheduler(driver)
ready = true
}
@@ -449,12 +453,8 @@ private[spark] class MesosClusterScheduler(
offer.cpu -= driverCpu
offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus").setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
- val memResource = Resource.newBuilder()
- .setName("mem").setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
+ val cpuResource = createResource("cpus", driverCpu)
+ val memResource = createResource("mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d72e2af456..3f63ec1c58 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -32,6 +32,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
+
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
@@ -45,8 +46,8 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with MesosSchedulerUtils {
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
+ // Stores the slave ids that has launched a Mesos executor.
+ val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks
@@ -66,12 +67,21 @@ private[spark] class MesosSchedulerBackend(
@volatile var appId: String = _
override def start() {
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
classLoader = Thread.currentThread.getContextClassLoader
- startScheduler(master, MesosSchedulerBackend.this, fwInfo)
+ val driver = createSchedulerDriver(
+ master, MesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+ startScheduler(driver)
}
- def createExecutorInfo(execId: String): MesosExecutorInfo = {
+ /**
+ * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+ * @param availableResources Available resources that is offered by Mesos
+ * @param execId The executor id to assign to this new executor.
+ * @return A tuple of the new mesos executor info and the remaining available resources.
+ */
+ def createExecutorInfo(
+ availableResources: JList[Resource],
+ execId: String): (MesosExecutorInfo, JList[Resource]) = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
@@ -115,32 +125,25 @@ private[spark] class MesosSchedulerBackend(
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
- val cpus = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder()
- .setValue(mesosExecutorCores).build())
- .build()
- val memory = Resource.newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(
- Value.Scalar.newBuilder()
- .setValue(calculateTotalMemory(sc)).build())
- .build()
- val executorInfo = MesosExecutorInfo.newBuilder()
+ val builder = MesosExecutorInfo.newBuilder()
+ val (resourcesAfterCpu, usedCpuResources) =
+ partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
+ val (resourcesAfterMem, usedMemResources) =
+ partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))
+
+ builder.addAllResources(usedCpuResources)
+ builder.addAllResources(usedMemResources)
+ val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
- .addResources(cpus)
- .addResources(memory)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}
- executorInfo.build()
+ (executorInfo.build(), resourcesAfterMem)
}
/**
@@ -183,6 +186,18 @@ private[spark] class MesosSchedulerBackend(
override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+ private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
+ val builder = new StringBuilder
+ tasks.foreach { t =>
+ builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
+ .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
+ .append("Task resources: ").append(t.getResourcesList).append("\n")
+ .append("Executor resources: ").append(t.getExecutor.getResourcesList)
+ .append("---------------------------------------------\n")
+ }
+ builder.toString()
+ }
+
/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
@@ -207,7 +222,7 @@ private[spark] class MesosSchedulerBackend(
val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
- (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+ (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
@@ -221,7 +236,7 @@ private[spark] class MesosSchedulerBackend(
unUsableOffers.foreach(o => d.declineOffer(o.getId))
val workerOffers = usableOffers.map { o =>
- val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
+ val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
@@ -236,6 +251,10 @@ private[spark] class MesosSchedulerBackend(
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
+ val slaveIdToResources = new HashMap[String, JList[Resource]]()
+ usableOffers.foreach { o =>
+ slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
+ }
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
@@ -245,15 +264,19 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
- offer.foreach { taskDesc =>
- val slaveId = taskDesc.executorId
- slaveIdsWithExecutors += slaveId
- slavesIdsOfAcceptedOffers += slaveId
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
- .add(createMesosTask(taskDesc, slaveId))
+ offer.foreach { taskDesc =>
+ val slaveId = taskDesc.executorId
+ slavesIdsOfAcceptedOffers += slaveId
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ val (mesosTask, remainingResources) = createMesosTask(
+ taskDesc,
+ slaveIdToResources(slaveId),
+ slaveId)
+ mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+ .add(mesosTask)
+ slaveIdToResources(slaveId) = remainingResources
+ }
}
- }
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
@@ -264,6 +287,7 @@ private[spark] class MesosSchedulerBackend(
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
+ logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
@@ -272,26 +296,32 @@ private[spark] class MesosSchedulerBackend(
for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
d.declineOffer(o.getId)
}
-
}
}
- /** Turn a Spark TaskDescription into a Mesos task */
- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+ /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
+ def createMesosTask(
+ task: TaskDescription,
+ resources: JList[Resource],
+ slaveId: String): (MesosTaskInfo, JList[Resource]) = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
- .build()
- MesosTaskInfo.newBuilder()
+ val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
+ (slaveIdToExecutorInfo(slaveId), resources)
+ } else {
+ createExecutorInfo(resources, slaveId)
+ }
+ slaveIdToExecutorInfo(slaveId) = executorInfo
+ val (finalResources, cpuResources) =
+ partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
+ val taskInfo = MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(createExecutorInfo(slaveId))
+ .setExecutor(executorInfo)
.setName(task.name)
- .addResources(cpuResource)
+ .addAllResources(cpuResources)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()
+ (taskInfo, finalResources)
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
@@ -337,7 +367,7 @@ private[spark] class MesosSchedulerBackend(
private def removeExecutor(slaveId: String, reason: String) = {
synchronized {
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
- slaveIdsWithExecutors -= slaveId
+ slaveIdToExecutorInfo -= slaveId
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 925702e63a..c04920e4f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -21,15 +21,17 @@ import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.GeneratedMessage
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext}
import org.apache.spark.util.Utils
+
/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
@@ -42,13 +44,63 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
protected var mesosDriver: SchedulerDriver = null
/**
- * Starts the MesosSchedulerDriver with the provided information. This method returns
- * only after the scheduler has registered with Mesos.
- * @param masterUrl Mesos master connection URL
- * @param scheduler Scheduler object
- * @param fwInfo FrameworkInfo to pass to the Mesos master
+ * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+ * @param masterUrl The url to connect to Mesos master
+ * @param scheduler the scheduler class to receive scheduler callbacks
+ * @param sparkUser User to impersonate with when running tasks
+ * @param appName The framework name to display on the Mesos UI
+ * @param conf Spark configuration
+ * @param webuiUrl The WebUI url to link from Mesos UI
+ * @param checkpoint Option to checkpoint tasks for failover
+ * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
+ * @param frameworkId The id of the new framework
*/
- def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = {
+ protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
+ val credBuilder = Credential.newBuilder()
+ webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
+ checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) }
+ failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) }
+ frameworkId.foreach { id =>
+ fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
+ }
+ conf.getOption("spark.mesos.principal").foreach { principal =>
+ fwInfoBuilder.setPrincipal(principal)
+ credBuilder.setPrincipal(principal)
+ }
+ conf.getOption("spark.mesos.secret").foreach { secret =>
+ credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+ }
+ if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
+ throw new SparkException(
+ "spark.mesos.principal must be configured when spark.mesos.secret is set")
+ }
+ conf.getOption("spark.mesos.role").foreach { role =>
+ fwInfoBuilder.setRole(role)
+ }
+ if (credBuilder.hasPrincipal) {
+ new MesosSchedulerDriver(
+ scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
+ } else {
+ new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
+ }
+ }
+
+ /**
+ * Starts the MesosSchedulerDriver and stores the current running driver to this new instance.
+ * This driver is expected to not be running.
+ * This method returns only after the scheduler has registered with Mesos.
+ */
+ def startScheduler(newDriver: SchedulerDriver): Unit = {
synchronized {
if (mesosDriver != null) {
registerLatch.await()
@@ -59,11 +111,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
setDaemon(true)
override def run() {
- mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
+ mesosDriver = newDriver
try {
val ret = mesosDriver.run()
logInfo("driver.run() returned with code " + ret)
- if (ret.equals(Status.DRIVER_ABORTED)) {
+ if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
System.exit(1)
}
} catch {
@@ -82,18 +134,62 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Signal that the scheduler has registered with Mesos.
*/
+ protected def getResource(res: JList[Resource], name: String): Double = {
+ // A resource can have multiple values in the offer since it can either be from
+ // a specific role or wildcard.
+ res.filter(_.getName == name).map(_.getScalar.getValue).sum
+ }
+
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
+ def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+ val builder = Resource.newBuilder()
+ .setName(name)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
+
+ role.foreach { r => builder.setRole(r) }
+
+ builder.build()
+ }
+
/**
- * Get the amount of resources for the specified type from the resource list
+ * Partition the existing set of resources into two groups, those remaining to be
+ * scheduled and those requested to be used for a new task.
+ * @param resources The full list of available resources
+ * @param resourceName The name of the resource to take from the available resources
+ * @param amountToUse The amount of resources to take from the available resources
+ * @return The remaining resources list and the used resources list.
*/
- protected def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
+ def partitionResources(
+ resources: JList[Resource],
+ resourceName: String,
+ amountToUse: Double): (List[Resource], List[Resource]) = {
+ var remain = amountToUse
+ var requestedResources = new ArrayBuffer[Resource]
+ val remainingResources = resources.map {
+ case r => {
+ if (remain > 0 &&
+ r.getType == Value.Type.SCALAR &&
+ r.getScalar.getValue > 0.0 &&
+ r.getName == resourceName) {
+ val usage = Math.min(remain, r.getScalar.getValue)
+ requestedResources += createResource(resourceName, usage, Some(r.getRole))
+ remain -= usage
+ createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+ } else {
+ r
+ }
+ }
}
- 0.0
+
+ // Filter any resource that has depleted.
+ val filteredResources =
+ remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)
+
+ (filteredResources.toList, requestedResources.toList)
}
/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 3f1692917a..4b504df7b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -22,7 +22,7 @@ import java.util.Collections
import org.apache.mesos.Protos.Value.Scalar
import org.apache.mesos.Protos._
-import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.Matchers
@@ -60,7 +60,16 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
- mesosDriver = driver
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = driver
markRegistered()
}
backend.start()
@@ -80,6 +89,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
test("mesos supports killing and limiting executors") {
val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
@@ -87,7 +97,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
sparkConf.set("spark.driver.port", "1234")
val backend = createSchedulerBackend(taskScheduler, driver)
- val minMem = backend.calculateTotalMemory(sc).toInt
+ val minMem = backend.calculateTotalMemory(sc)
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
@@ -130,11 +140,12 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
test("mesos supports killing and relaunching tasks with executors") {
val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val backend = createSchedulerBackend(taskScheduler, driver)
- val minMem = backend.calculateTotalMemory(sc).toInt + 1024
+ val minMem = backend.calculateTotalMemory(sc) + 1024
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index d01837fe78..5ed30f64d7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.Collections
+import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -60,14 +61,17 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val resources = List(
+ mesosSchedulerBackend.createResource("cpus", 4),
+ mesosSchedulerBackend.createResource("mem", 1024))
// uri is null.
- val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+ val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
assert(executorInfo.getCommand.getValue ===
s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
// uri exists.
conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
- val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+ val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
assert(executorInfo1.getCommand.getValue ===
s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
}
@@ -93,7 +97,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
- val execInfo = backend.createExecutorInfo("mockExecutor")
+ val (execInfo, _) = backend.createExecutorInfo(
+ List(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
@@ -194,7 +199,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
)
verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
- assert(capture.getValue.size() == 1)
+ assert(capture.getValue.size() === 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
val cpus = taskInfo.getResourcesList.get(0)
@@ -214,4 +219,97 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
backend.resourceOffers(driver, mesosOffers2)
verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
}
+
+ test("can handle multiple roles") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(new SparkConf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val id = 1
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setRole("prod")
+ .setScalar(Scalar.newBuilder().setValue(500))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("prod")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(1))
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(600))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(2))
+ val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(offer)
+
+ val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2 // Deducting 1 for executor
+ ))
+
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+ val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ assert(taskInfo.getResourcesCount === 1)
+ val cpusDev = taskInfo.getResourcesList.get(0)
+ assert(cpusDev.getName.equals("cpus"))
+ assert(cpusDev.getScalar.getValue.equals(1.0))
+ assert(cpusDev.getRole.equals("dev"))
+ val executorResources = taskInfo.getExecutor.getResourcesList
+ assert(executorResources.exists { r =>
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
+ })
+ assert(executorResources.exists { r =>
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
+ })
+ }
}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 1f915d8ea1..debdd2adf2 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -307,6 +307,28 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.principal</code></td>
+ <td>Framework principal to authenticate to Mesos</td>
+ <td>
+ Set the principal with which Spark framework will use to authenticate with Mesos.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.secret</code></td>
+ <td>Framework secret to authenticate to Mesos</td>
+ <td>
+ Set the secret with which Spark framework will use to authenticate with Mesos.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.role</code></td>
+ <td>Role for the Spark framework</td>
+ <td>
+ Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations
+ and resource weight sharing.
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.constraints</code></td>
<td>Attribute based constraints to be matched against when accepting resource offers.</td>
<td>