aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Chauhan <achauhan@brightcove.com>2015-07-06 16:04:57 -0700
committerAndrew Or <andrew@databricks.com>2015-07-06 16:04:57 -0700
commit1165b17d24cdf1dbebb2faca14308dfe5c2a652c (patch)
treee3f1508fadb7e5a70b3039a707918227459fda96
parent9ff203346ca4decf2999e33bfb8c400ec75313e6 (diff)
downloadspark-1165b17d24cdf1dbebb2faca14308dfe5c2a652c.tar.gz
spark-1165b17d24cdf1dbebb2faca14308dfe5c2a652c.tar.bz2
spark-1165b17d24cdf1dbebb2faca14308dfe5c2a652c.zip
[SPARK-6707] [CORE] [MESOS] Mesos Scheduler should allow the user to specify constraints based on slave attributes
Currently, the mesos scheduler only looks at the 'cpu' and 'mem' resources when trying to determine the usablility of a resource offer from a mesos slave node. It may be preferable for the user to be able to ensure that the spark jobs are only started on a certain set of nodes (based on attributes). For example, If the user sets a property, let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors. Author: Ankur Chauhan <achauhan@brightcove.com> Closes #5563 from ankurcha/mesos_attribs and squashes the following commits: 902535b [Ankur Chauhan] Fix line length d83801c [Ankur Chauhan] Update code as per code review comments 8b73f2d [Ankur Chauhan] Fix imports c3523e7 [Ankur Chauhan] Added docs 1a24d0b [Ankur Chauhan] Expand scope of attributes matching to include all data types 482fd71 [Ankur Chauhan] Update access modifier to private[this] for offer constraints 5ccc32d [Ankur Chauhan] Fix nit pick whitespace 1bce782 [Ankur Chauhan] Fix nit pick whitespace c0cbc75 [Ankur Chauhan] Use offer id value for debug message 7fee0ea [Ankur Chauhan] Add debug statements fc7eb5b [Ankur Chauhan] Fix import codestyle 00be252 [Ankur Chauhan] Style changes as per code review comments 662535f [Ankur Chauhan] Incorporate code review comments + use SparkFunSuite fdc0937 [Ankur Chauhan] Decline offers that did not meet criteria 67b58a0 [Ankur Chauhan] Add documentation for spark.mesos.constraints 63f53f4 [Ankur Chauhan] Update codestyle - uniform style for config values 02031e4 [Ankur Chauhan] Fix scalastyle warnings in tests c09ed84 [Ankur Chauhan] Fixed the access modifier on offerConstraints val to private[mesos] 0c64df6 [Ankur Chauhan] Rename overhead fractions to memory_*, fix spacing 8cc1e8f [Ankur Chauhan] Make exception message more explicit about the source of the error addedba [Ankur Chauhan] Added test case for malformed constraint string ec9d9a6 [Ankur Chauhan] Add tests for parse constraint string 72fe88a [Ankur Chauhan] Fix up tests + remove redundant method override, combine utility class into new mesos scheduler util trait 92b47fd [Ankur Chauhan] Add attributes based constraints support to MesosScheduler
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala153
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala140
-rw-r--r--docs/running-on-mesos.md22
9 files changed, 376 insertions, 128 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 6b8edca5aa..b68f8c7685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,18 +18,18 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{Collections, List => JList}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(
val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
+ // Offer constraints
+ private val slaveOfferConstraints =
+ parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
var nextMesosTaskId = 0
@volatile var appId: String = _
@@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-
for (offer <- offers) {
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores &&
- mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ val id = offer.getId.getValue
+ if (meetsConstraints &&
+ totalCoresAcquired < maxCores &&
+ mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
@@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem",
- MemoryUtils.calculateTotalMemory(sc)))
+ .addResources(createResource("mem", calculateTotalMemory(sc)))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
+ .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
}
- d.launchTasks(
- Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
+ // accept the offer and launch the task
+ logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ d.launchTasks(List(offer.getId), List(task.build()), filters)
} else {
- // Filter it out
- d.launchTasks(
- Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
+ // Decline the offer
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ d.declineOffer(offer.getId)
}
}
}
}
- /** Build a Mesos resource protobuf object */
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
deleted file mode 100644
index 8df4f3b554..0000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.apache.spark.SparkContext
-
-private[spark] object MemoryUtils {
- // These defaults copied from YARN
- val OVERHEAD_FRACTION = 0.10
- val OVERHEAD_MINIMUM = 384
-
- def calculateTotalMemory(sc: SparkContext): Int = {
- sc.conf.getInt("spark.mesos.executor.memoryOverhead",
- math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1067a7f1ca..d3a20f8221 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}
+
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 49de85ef48..d72e2af456 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
+import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.spark.{SparkContext, SparkException, TaskState}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkException, TaskState}
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
@@ -59,6 +59,10 @@ private[spark] class MesosSchedulerBackend(
private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
+ // Offer constraints
+ private[this] val slaveOfferConstraints =
+ parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
@volatile var appId: String = _
override def start() {
@@ -71,8 +75,8 @@ private[spark] class MesosSchedulerBackend(
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
+ throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+ }
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
@@ -115,14 +119,14 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
- .setValue(mesosExecutorCores).build())
+ .setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
- .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
+ .setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -191,13 +195,31 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- (mem >= MemoryUtils.calculateTotalMemory(sc) &&
- // need at least 1 for executor, 1 for task
- cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
- (slaveIdsWithExecutors.contains(slaveId) &&
- cpus >= scheduler.CPUS_PER_TASK)
+ val offerAttributes = toAttributeMap(o.getAttributesList)
+
+ // check if all constraints are satisfield
+ // 1. Attribute constraints
+ // 2. Memory requirements
+ // 3. CPU requirements - need at least 1 for executor, 1 for task
+ val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+ val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
+
+ val meetsRequirements =
+ (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
+ (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+
+ // add some debug messaging
+ val debugstr = if (meetsRequirements) "Accepting" else "Declining"
+ val id = o.getId.getValue
+ logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+
+ meetsRequirements
}
+ // Decline offers we ruled out immediately
+ unUsableOffers.foreach(o => d.declineOffer(o.getId))
+
val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
@@ -223,15 +245,15 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
- offer.foreach { taskDesc =>
- val slaveId = taskDesc.executorId
- slaveIdsWithExecutors += slaveId
- slavesIdsOfAcceptedOffers += slaveId
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
- .add(createMesosTask(taskDesc, slaveId))
- }
+ offer.foreach { taskDesc =>
+ val slaveId = taskDesc.executorId
+ slaveIdsWithExecutors += slaveId
+ slavesIdsOfAcceptedOffers += slaveId
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+ .add(createMesosTask(taskDesc, slaveId))
}
+ }
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
@@ -251,8 +273,6 @@ private[spark] class MesosSchedulerBackend(
d.declineOffer(o.getId)
}
- // Decline offers we ruled out immediately
- unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index d11228f3d0..d8a8c848bb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -17,14 +17,17 @@
package org.apache.spark.scheduler.cluster.mesos
-import java.util.List
+import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
-import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
-import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
-import org.apache.spark.Logging
+import com.google.common.base.Splitter
+import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
+import org.apache.mesos.Protos._
+import org.apache.mesos.protobuf.GeneratedMessage
+import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.util.Utils
/**
@@ -86,10 +89,150 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Get the amount of resources for the specified type from the resource list
*/
- protected def getResource(res: List[Resource], name: String): Double = {
+ protected def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0.0
}
+
+ /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
+ protected def getAttribute(attr: Attribute): (String, Set[String]) = {
+ (attr.getName, attr.getText.getValue.split(',').toSet)
+ }
+
+
+ /** Build a Mesos resource protobuf object */
+ protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+ Resource.newBuilder()
+ .setName(resourceName)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+ .build()
+ }
+
+ /**
+ * Converts the attributes from the resource offer into a Map of name -> Attribute Value
+ * The attribute values are the mesos attribute types and they are
+ * @param offerAttributes
+ * @return
+ */
+ protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+ offerAttributes.map(attr => {
+ val attrValue = attr.getType match {
+ case Value.Type.SCALAR => attr.getScalar
+ case Value.Type.RANGES => attr.getRanges
+ case Value.Type.SET => attr.getSet
+ case Value.Type.TEXT => attr.getText
+ }
+ (attr.getName, attrValue)
+ }).toMap
+ }
+
+
+ /**
+ * Match the requirements (if any) to the offer attributes.
+ * if attribute requirements are not specified - return true
+ * else if attribute is defined and no values are given, simple attribute presence is performed
+ * else if attribute name and value is specified, subset match is performed on slave attributes
+ */
+ def matchesAttributeRequirements(
+ slaveOfferConstraints: Map[String, Set[String]],
+ offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+ slaveOfferConstraints.forall {
+ // offer has the required attribute and subsumes the required values for that attribute
+ case (name, requiredValues) =>
+ offerAttributes.get(name) match {
+ case None => false
+ case Some(_) if requiredValues.isEmpty => true // empty value matches presence
+ case Some(scalarValue: Value.Scalar) =>
+ // check if provided values is less than equal to the offered values
+ requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
+ case Some(rangeValue: Value.Range) =>
+ val offerRange = rangeValue.getBegin to rangeValue.getEnd
+ // Check if there is some required value that is between the ranges specified
+ // Note: We only support the ability to specify discrete values, in the future
+ // we may expand it to subsume ranges specified with a XX..YY value or something
+ // similar to that.
+ requiredValues.map(_.toLong).exists(offerRange.contains(_))
+ case Some(offeredValue: Value.Set) =>
+ // check if the specified required values is a subset of offered set
+ requiredValues.subsetOf(offeredValue.getItemList.toSet)
+ case Some(textValue: Value.Text) =>
+ // check if the specified value is equal, if multiple values are specified
+ // we succeed if any of them match.
+ requiredValues.contains(textValue.getValue)
+ }
+ }
+ }
+
+ /**
+ * Parses the attributes constraints provided to spark and build a matching data struct:
+ * Map[<attribute-name>, Set[values-to-match]]
+ * The constraints are specified as ';' separated key-value pairs where keys and values
+ * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
+ * multiple values (comma separated). For example:
+ * {{{
+ * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
+ * // would result in
+ * <code>
+ * Map(
+ * "tachyon" -> Set("true"),
+ * "zone": -> Set("us-east-1a", "us-east-1b")
+ * )
+ * }}}
+ *
+ * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/
+ * https://github.com/apache/mesos/blob/master/src/common/values.cpp
+ * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+ *
+ * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated
+ * by ':')
+ * @return Map of constraints to match resources offers.
+ */
+ def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
+ /*
+ Based on mesos docs:
+ attributes : attribute ( ";" attribute )*
+ attribute : labelString ":" ( labelString | "," )+
+ labelString : [a-zA-Z0-9_/.-]
+ */
+ val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
+ // kv splitter
+ if (constraintsVal.isEmpty) {
+ Map()
+ } else {
+ try {
+ Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
+ case (k, v) =>
+ if (v == null || v.isEmpty) {
+ (k, Set[String]())
+ } else {
+ (k, v.split(',').toSet)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
+ }
+ }
+ }
+
+ // These defaults copied from YARN
+ private val MEMORY_OVERHEAD_FRACTION = 0.10
+ private val MEMORY_OVERHEAD_MINIMUM = 384
+
+ /**
+ * Return the amount of memory to allocate to each executor, taking into account
+ * container overheads.
+ * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
+ * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
+ * (whichever is larger)
+ */
+ def calculateTotalMemory(sc: SparkContext): Int = {
+ sc.conf.getInt("spark.mesos.executor.memoryOverhead",
+ math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+ sc.executorMemory
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
deleted file mode 100644
index e72285d03d..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-
-class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar {
- test("MesosMemoryUtils should always override memoryOverhead when it's set") {
- val sparkConf = new SparkConf
-
- val sc = mock[SparkContext]
- when(sc.conf).thenReturn(sparkConf)
-
- // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
- when(sc.executorMemory).thenReturn(512)
- assert(MemoryUtils.calculateTotalMemory(sc) === 896)
-
- // 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
- when(sc.executorMemory).thenReturn(4096)
- assert(MemoryUtils.calculateTotalMemory(sc) === 4505)
-
- // set memoryOverhead
- sparkConf.set("spark.mesos.executor.memoryOverhead", "100")
- assert(MemoryUtils.calculateTotalMemory(sc) === 4196)
- sparkConf.set("spark.mesos.executor.memoryOverhead", "400")
- assert(MemoryUtils.calculateTotalMemory(sc) === 4496)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 68df46a41d..d01837fe78 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -149,7 +149,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)
- val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
+ val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val minMem = backend.calculateTotalMemory(sc)
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
@@ -157,8 +159,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000000..b354914b6f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.Value
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+ // scalastyle:off structural.type
+ // this is the documented way of generating fixtures in scalatest
+ def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+ val sparkConf = new SparkConf
+ val sc = mock[SparkContext]
+ when(sc.conf).thenReturn(sparkConf)
+ }
+ val utils = new MesosSchedulerUtils { }
+ // scalastyle:on structural.type
+
+ test("use at-least minimum overhead") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(512)
+ utils.calculateTotalMemory(f.sc) shouldBe 896
+ }
+
+ test("use overhead if it is greater than minimum value") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(4096)
+ utils.calculateTotalMemory(f.sc) shouldBe 4505
+ }
+
+ test("use spark.mesos.executor.memoryOverhead (if set)") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(1024)
+ f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+ utils.calculateTotalMemory(f.sc) shouldBe 1536
+ }
+
+ test("parse a non-empty constraint string correctly") {
+ val expectedMap = Map(
+ "tachyon" -> Set("true"),
+ "zone" -> Set("us-east-1a", "us-east-1b")
+ )
+ utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap)
+ }
+
+ test("parse an empty constraint string correctly") {
+ utils.parseConstraintString("") shouldBe Map()
+ }
+
+ test("throw an exception when the input is malformed") {
+ an[IllegalArgumentException] should be thrownBy
+ utils.parseConstraintString("tachyon;zone:us-east")
+ }
+
+ test("empty values for attributes' constraints matches all values") {
+ val constraintsStr = "tachyon:"
+ val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+ parsedConstraints shouldBe Map("tachyon" -> Set())
+
+ val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+ val noTachyonOffer = Map("zone" -> zoneSet)
+ val tachyonTrueOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
+ val tachyonFalseOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("false").build())
+
+ utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) shouldBe false
+ utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) shouldBe true
+ utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) shouldBe true
+ }
+
+ test("subset match is performed for set attributes") {
+ val supersetConstraint = Map(
+ "tachyon" -> Value.Text.newBuilder().setValue("true").build(),
+ "zone" -> Value.Set.newBuilder()
+ .addItem("us-east-1a")
+ .addItem("us-east-1b")
+ .addItem("us-east-1c")
+ .build())
+
+ val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c"
+ val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+ utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
+ }
+
+ test("less than equal match is performed on scalar attributes") {
+ val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
+
+ val ltConstraint = utils.parseConstraintString("gpus:2")
+ val eqConstraint = utils.parseConstraintString("gpus:3")
+ val gtConstraint = utils.parseConstraintString("gpus:4")
+
+ utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+ }
+
+ test("contains match is performed for range attributes") {
+ val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+ val ltConstraint = utils.parseConstraintString("ports:6000")
+ val eqConstraint = utils.parseConstraintString("ports:7500")
+ val gtConstraint = utils.parseConstraintString("ports:8002")
+ val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+ utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
+ utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+ utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
+ }
+
+ test("equality match is performed for text attributes") {
+ val offerAttribs = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
+
+ val trueConstraint = utils.parseConstraintString("tachyon:true")
+ val falseConstraint = utils.parseConstraintString("tachyon:false")
+
+ utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
+ }
+
+}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 5f1d6daeb2..1f915d8ea1 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -184,6 +184,14 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere
only makes sense if you run just one application at a time. You can cap the maximum number of cores
using `conf.set("spark.cores.max", "10")` (for example).
+You may also make use of `spark.mesos.constraints` to set attribute based constraints on mesos resource offers. By default, all resource offers will be accepted.
+
+{% highlight scala %}
+conf.set("spark.mesos.constraints", "tachyon=true;us-east-1=false")
+{% endhighlight %}
+
+For example, Let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
+
# Mesos Docker Support
Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image`
@@ -298,6 +306,20 @@ See the [configuration page](configuration.html) for information on Spark config
the final overhead will be this value.
</td>
</tr>
+<tr>
+ <td><code>spark.mesos.constraints</code></td>
+ <td>Attribute based constraints to be matched against when accepting resource offers.</td>
+ <td>
+ Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
+ <ul>
+ <li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
+ <li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
+ <li>Set constraints are matched with "subset of" semantics i.e. value in the constraint must be a subset of the resource offer's value.</li>
+ <li>Text constraints are metched with "equality" semantics i.e. value in the constraint must be exactly equal to the resource offer's value.</li>
+ <li>In case there is no value present as a part of the constraint any offer with the corresponding attribute will be accepted (without value check).</li>
+ </ul>
+ </td>
+</tr>
</table>
# Troubleshooting and Debugging