aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala153
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala140
-rw-r--r--docs/running-on-mesos.md22
9 files changed, 376 insertions, 128 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 6b8edca5aa..b68f8c7685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,18 +18,18 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{Collections, List => JList}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(
val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
+ // Offer constraints
+ private val slaveOfferConstraints =
+ parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
var nextMesosTaskId = 0
@volatile var appId: String = _
@@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-
for (offer <- offers) {
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores &&
- mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ val id = offer.getId.getValue
+ if (meetsConstraints &&
+ totalCoresAcquired < maxCores &&
+ mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
@@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem",
- MemoryUtils.calculateTotalMemory(sc)))
+ .addResources(createResource("mem", calculateTotalMemory(sc)))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
+ .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
}
- d.launchTasks(
- Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
+ // accept the offer and launch the task
+ logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ d.launchTasks(List(offer.getId), List(task.build()), filters)
} else {
- // Filter it out
- d.launchTasks(
- Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
+ // Decline the offer
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ d.declineOffer(offer.getId)
}
}
}
}
- /** Build a Mesos resource protobuf object */
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
deleted file mode 100644
index 8df4f3b554..0000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.apache.spark.SparkContext
-
-private[spark] object MemoryUtils {
- // These defaults copied from YARN
- val OVERHEAD_FRACTION = 0.10
- val OVERHEAD_MINIMUM = 384
-
- def calculateTotalMemory(sc: SparkContext): Int = {
- sc.conf.getInt("spark.mesos.executor.memoryOverhead",
- math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1067a7f1ca..d3a20f8221 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}
+
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 49de85ef48..d72e2af456 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
+import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.spark.{SparkContext, SparkException, TaskState}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkException, TaskState}
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
@@ -59,6 +59,10 @@ private[spark] class MesosSchedulerBackend(
private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
+ // Offer constraints
+ private[this] val slaveOfferConstraints =
+ parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
@volatile var appId: String = _
override def start() {
@@ -71,8 +75,8 @@ private[spark] class MesosSchedulerBackend(
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
+ throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+ }
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
@@ -115,14 +119,14 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
- .setValue(mesosExecutorCores).build())
+ .setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
- .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
+ .setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -191,13 +195,31 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- (mem >= MemoryUtils.calculateTotalMemory(sc) &&
- // need at least 1 for executor, 1 for task
- cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
- (slaveIdsWithExecutors.contains(slaveId) &&
- cpus >= scheduler.CPUS_PER_TASK)
+ val offerAttributes = toAttributeMap(o.getAttributesList)
+
+ // check if all constraints are satisfield
+ // 1. Attribute constraints
+ // 2. Memory requirements
+ // 3. CPU requirements - need at least 1 for executor, 1 for task
+ val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+ val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
+
+ val meetsRequirements =
+ (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
+ (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+
+ // add some debug messaging
+ val debugstr = if (meetsRequirements) "Accepting" else "Declining"
+ val id = o.getId.getValue
+ logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+
+ meetsRequirements
}
+ // Decline offers we ruled out immediately
+ unUsableOffers.foreach(o => d.declineOffer(o.getId))
+
val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
@@ -223,15 +245,15 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
- offer.foreach { taskDesc =>
- val slaveId = taskDesc.executorId
- slaveIdsWithExecutors += slaveId
- slavesIdsOfAcceptedOffers += slaveId
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
- .add(createMesosTask(taskDesc, slaveId))
- }
+ offer.foreach { taskDesc =>
+ val slaveId = taskDesc.executorId
+ slaveIdsWithExecutors += slaveId
+ slavesIdsOfAcceptedOffers += slaveId
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+ .add(createMesosTask(taskDesc, slaveId))
}
+ }
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
@@ -251,8 +273,6 @@ private[spark] class MesosSchedulerBackend(
d.declineOffer(o.getId)
}
- // Decline offers we ruled out immediately
- unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index d11228f3d0..d8a8c848bb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -17,14 +17,17 @@
package org.apache.spark.scheduler.cluster.mesos
-import java.util.List
+import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
-import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
-import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
-import org.apache.spark.Logging
+import com.google.common.base.Splitter
+import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
+import org.apache.mesos.Protos._
+import org.apache.mesos.protobuf.GeneratedMessage
+import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.util.Utils
/**
@@ -86,10 +89,150 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Get the amount of resources for the specified type from the resource list
*/
- protected def getResource(res: List[Resource], name: String): Double = {
+ protected def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0.0
}
+
+ /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
+ protected def getAttribute(attr: Attribute): (String, Set[String]) = {
+ (attr.getName, attr.getText.getValue.split(',').toSet)
+ }
+
+
+ /** Build a Mesos resource protobuf object */
+ protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+ Resource.newBuilder()
+ .setName(resourceName)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+ .build()
+ }
+
+ /**
+ * Converts the attributes from the resource offer into a Map of name -> Attribute Value
+ * The attribute values are the mesos attribute types and they are
+ * @param offerAttributes
+ * @return
+ */
+ protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+ offerAttributes.map(attr => {
+ val attrValue = attr.getType match {
+ case Value.Type.SCALAR => attr.getScalar
+ case Value.Type.RANGES => attr.getRanges
+ case Value.Type.SET => attr.getSet
+ case Value.Type.TEXT => attr.getText
+ }
+ (attr.getName, attrValue)
+ }).toMap
+ }
+
+
+ /**
+ * Match the requirements (if any) to the offer attributes.
+ * if attribute requirements are not specified - return true
+ * else if attribute is defined and no values are given, simple attribute presence is performed
+ * else if attribute name and value is specified, subset match is performed on slave attributes
+ */
+ def matchesAttributeRequirements(
+ slaveOfferConstraints: Map[String, Set[String]],
+ offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+ slaveOfferConstraints.forall {
+ // offer has the required attribute and subsumes the required values for that attribute
+ case (name, requiredValues) =>
+ offerAttributes.get(name) match {
+ case None => false
+ case Some(_) if requiredValues.isEmpty => true // empty value matches presence
+ case Some(scalarValue: Value.Scalar) =>
+ // check if provided values is less than equal to the offered values
+ requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
+ case Some(rangeValue: Value.Range) =>
+ val offerRange = rangeValue.getBegin to rangeValue.getEnd
+ // Check if there is some required value that is between the ranges specified
+ // Note: We only support the ability to specify discrete values, in the future
+ // we may expand it to subsume ranges specified with a XX..YY value or something
+ // similar to that.
+ requiredValues.map(_.toLong).exists(offerRange.contains(_))
+ case Some(offeredValue: Value.Set) =>
+ // check if the specified required values is a subset of offered set
+ requiredValues.subsetOf(offeredValue.getItemList.toSet)
+ case Some(textValue: Value.Text) =>
+ // check if the specified value is equal, if multiple values are specified
+ // we succeed if any of them match.
+ requiredValues.contains(textValue.getValue)
+ }
+ }
+ }
+
+ /**
+ * Parses the attributes constraints provided to spark and build a matching data struct:
+ * Map[<attribute-name>, Set[values-to-match]]
+ * The constraints are specified as ';' separated key-value pairs where keys and values
+ * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
+ * multiple values (comma separated). For example:
+ * {{{
+ * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
+ * // would result in
+ * <code>
+ * Map(
+ * "tachyon" -> Set("true"),
+ * "zone": -> Set("us-east-1a", "us-east-1b")
+ * )
+ * }}}
+ *
+ * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/
+ * https://github.com/apache/mesos/blob/master/src/common/values.cpp
+ * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+ *
+ * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated
+ * by ':')
+ * @return Map of constraints to match resources offers.
+ */
+ def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
+ /*
+ Based on mesos docs:
+ attributes : attribute ( ";" attribute )*
+ attribute : labelString ":" ( labelString | "," )+
+ labelString : [a-zA-Z0-9_/.-]
+ */
+ val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
+ // kv splitter
+ if (constraintsVal.isEmpty) {
+ Map()
+ } else {
+ try {
+ Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
+ case (k, v) =>
+ if (v == null || v.isEmpty) {
+ (k, Set[String]())
+ } else {
+ (k, v.split(',').toSet)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
+ }
+ }
+ }
+
+ // These defaults copied from YARN
+ private val MEMORY_OVERHEAD_FRACTION = 0.10
+ private val MEMORY_OVERHEAD_MINIMUM = 384
+
+ /**
+ * Return the amount of memory to allocate to each executor, taking into account
+ * container overheads.
+ * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
+ * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
+ * (whichever is larger)
+ */
+ def calculateTotalMemory(sc: SparkContext): Int = {
+ sc.conf.getInt("spark.mesos.executor.memoryOverhead",
+ math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+ sc.executorMemory
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
deleted file mode 100644
index e72285d03d..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-
-class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar {
- test("MesosMemoryUtils should always override memoryOverhead when it's set") {
- val sparkConf = new SparkConf
-
- val sc = mock[SparkContext]
- when(sc.conf).thenReturn(sparkConf)
-
- // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
- when(sc.executorMemory).thenReturn(512)
- assert(MemoryUtils.calculateTotalMemory(sc) === 896)
-
- // 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
- when(sc.executorMemory).thenReturn(4096)
- assert(MemoryUtils.calculateTotalMemory(sc) === 4505)
-
- // set memoryOverhead
- sparkConf.set("spark.mesos.executor.memoryOverhead", "100")
- assert(MemoryUtils.calculateTotalMemory(sc) === 4196)
- sparkConf.set("spark.mesos.executor.memoryOverhead", "400")
- assert(MemoryUtils.calculateTotalMemory(sc) === 4496)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 68df46a41d..d01837fe78 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -149,7 +149,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)
- val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
+ val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val minMem = backend.calculateTotalMemory(sc)
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
@@ -157,8 +159,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000000..b354914b6f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.Value
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+ // scalastyle:off structural.type
+ // this is the documented way of generating fixtures in scalatest
+ def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+ val sparkConf = new SparkConf
+ val sc = mock[SparkContext]
+ when(sc.conf).thenReturn(sparkConf)
+ }
+ val utils = new MesosSchedulerUtils { }
+ // scalastyle:on structural.type
+
+ test("use at-least minimum overhead") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(512)
+ utils.calculateTotalMemory(f.sc) shouldBe 896
+ }
+
+ test("use overhead if it is greater than minimum value") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(4096)
+ utils.calculateTotalMemory(f.sc) shouldBe 4505
+ }
+
+ test("use spark.mesos.executor.memoryOverhead (if set)") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(1024)
+ f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+ utils.calculateTotalMemory(f.sc) shouldBe 1536
+ }
+
+ test("parse a non-empty constraint string correctly") {
+ val expectedMap = Map(
+ "tachyon" -> Set("true"),
+ "zone" -> Set("us-east-1a", "us-east-1b")
+ )
+ utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap)
+ }
+
+ test("parse an empty constraint string correctly") {
+ utils.parseConstraintString("") shouldBe Map()
+ }
+
+ test("throw an exception when the input is malformed") {
+ an[IllegalArgumentException] should be thrownBy
+ utils.parseConstraintString("tachyon;zone:us-east")
+ }
+
+ test("empty values for attributes' constraints matches all values") {
+ val constraintsStr = "tachyon:"
+ val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+ parsedConstraints shouldBe Map("tachyon" -> Set())
+
+ val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+ val noTachyonOffer = Map("zone" -> zoneSet)
+ val tachyonTrueOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
+ val tachyonFalseOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("false").build())
+
+ utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) shouldBe false
+ utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) shouldBe true
+ utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) shouldBe true
+ }
+
+ test("subset match is performed for set attributes") {
+ val supersetConstraint = Map(
+ "tachyon" -> Value.Text.newBuilder().setValue("true").build(),
+ "zone" -> Value.Set.newBuilder()
+ .addItem("us-east-1a")
+ .addItem("us-east-1b")
+ .addItem("us-east-1c")
+ .build())
+
+ val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c"
+ val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+ utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
+ }
+
+ test("less than equal match is performed on scalar attributes") {
+ val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
+
+ val ltConstraint = utils.parseConstraintString("gpus:2")
+ val eqConstraint = utils.parseConstraintString("gpus:3")
+ val gtConstraint = utils.parseConstraintString("gpus:4")
+
+ utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+ }
+
+ test("contains match is performed for range attributes") {
+ val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+ val ltConstraint = utils.parseConstraintString("ports:6000")
+ val eqConstraint = utils.parseConstraintString("ports:7500")
+ val gtConstraint = utils.parseConstraintString("ports:8002")
+ val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+ utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
+ utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+ utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
+ }
+
+ test("equality match is performed for text attributes") {
+ val offerAttribs = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
+
+ val trueConstraint = utils.parseConstraintString("tachyon:true")
+ val falseConstraint = utils.parseConstraintString("tachyon:false")
+
+ utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
+ }
+
+}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 5f1d6daeb2..1f915d8ea1 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -184,6 +184,14 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere
only makes sense if you run just one application at a time. You can cap the maximum number of cores
using `conf.set("spark.cores.max", "10")` (for example).
+You may also make use of `spark.mesos.constraints` to set attribute based constraints on mesos resource offers. By default, all resource offers will be accepted.
+
+{% highlight scala %}
+conf.set("spark.mesos.constraints", "tachyon=true;us-east-1=false")
+{% endhighlight %}
+
+For example, Let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
+
# Mesos Docker Support
Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image`
@@ -298,6 +306,20 @@ See the [configuration page](configuration.html) for information on Spark config
the final overhead will be this value.
</td>
</tr>
+<tr>
+ <td><code>spark.mesos.constraints</code></td>
+ <td>Attribute based constraints to be matched against when accepting resource offers.</td>
+ <td>
+ Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
+ <ul>
+ <li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
+ <li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
+ <li>Set constraints are matched with "subset of" semantics i.e. value in the constraint must be a subset of the resource offer's value.</li>
+ <li>Text constraints are metched with "equality" semantics i.e. value in the constraint must be exactly equal to the resource offer's value.</li>
+ <li>In case there is no value present as a part of the constraint any offer with the corresponding attribute will be accepted (without value check).</li>
+ </ul>
+ </td>
+</tr>
</table>
# Troubleshooting and Debugging