aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorFelix Bechstein <felix.bechstein@otto.de>2015-11-09 13:36:14 -0800
committerAndrew Or <andrew@databricks.com>2015-11-09 13:36:14 -0800
commit5039a49b636325f321daa089971107003fae9d4b (patch)
tree00d735ec3e30eb2bf08ad2ec35d2d562bfef49fd /core
parent88a3fdcc783f880a8d01c7e194ec42fc114bdf8a (diff)
downloadspark-5039a49b636325f321daa089971107003fae9d4b.tar.gz
spark-5039a49b636325f321daa089971107003fae9d4b.tar.bz2
spark-5039a49b636325f321daa089971107003fae9d4b.zip
[SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints
this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation. this prevents mesos to send us these offers again and again. in return, we get more offers for slaves which might meet our constraints. and it enables mesos to send the rejected offers to other frameworks. Author: Felix Bechstein <felix.bechstein@otto.de> Closes #8639 from felixb/decline_offers_constraint_mismatch.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala92
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala4
3 files changed, 91 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d10a77f8e5..2de9b6a651 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+ // reject offers with mismatched constraints in seconds
+ private val rejectOfferDurationForUnmetConstraints =
+ getRejectOfferDurationForUnmetConstraints(sc)
+
// A client for talking to the external shuffle service, if it is a
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
@@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
- if (taskIdToSlaveId.size < executorLimit &&
- totalCoresAcquired < maxCores &&
- meetsConstraints &&
- mem >= calculateTotalMemory(sc) &&
- cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- totalCoresAcquired += cpusToUse
- val taskId = newMesosTaskId()
- taskIdToSlaveId.put(taskId, slaveId)
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- // Gather cpu resources from the available resources and use them in the task.
- val (remainingResources, cpuResourcesToUse) =
- partitionResources(offer.getResourcesList, "cpus", cpusToUse)
- val (_, memResourcesToUse) =
- partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
- val taskBuilder = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
- .setName("Task " + taskId)
- .addAllResources(cpuResourcesToUse.asJava)
- .addAllResources(memResourcesToUse.asJava)
-
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
+ if (meetsConstraints) {
+ if (taskIdToSlaveId.size < executorLimit &&
+ totalCoresAcquired < maxCores &&
+ mem >= calculateTotalMemory(sc) &&
+ cpus >= 1 &&
+ failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+ !slaveIdsWithExecutors.contains(slaveId)) {
+ // Launch an executor on the slave
+ val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ totalCoresAcquired += cpusToUse
+ val taskId = newMesosTaskId()
+ taskIdToSlaveId.put(taskId, slaveId)
+ slaveIdsWithExecutors += slaveId
+ coresByTaskId(taskId) = cpusToUse
+ // Gather cpu resources from the available resources and use them in the task.
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+ val (_, memResourcesToUse) =
+ partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
+ val taskBuilder = MesosTaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+ .setSlaveId(offer.getSlaveId)
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
+ .setName("Task " + taskId)
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
+ }
+
+ // Accept the offer and launch the task
+ logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
+ d.launchTasks(
+ Collections.singleton(offer.getId),
+ Collections.singleton(taskBuilder.build()), filters)
+ } else {
+ // Decline the offer
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ d.declineOffer(offer.getId)
}
-
- // accept the offer and launch the task
- logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
- d.launchTasks(
- Collections.singleton(offer.getId),
- Collections.singleton(taskBuilder.build()), filters)
} else {
- // Decline the offer
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- d.declineOffer(offer.getId)
+ // This offer does not meet constraints. We don't need to see it again.
+ // Decline the offer for a long period of time.
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ + s" for $rejectOfferDurationForUnmetConstraints seconds")
+ d.declineOffer(offer.getId, Filters.newBuilder()
+ .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index aaffac604a..281965a598 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+ // reject offers with mismatched constraints in seconds
+ private val rejectOfferDurationForUnmetConstraints =
+ getRejectOfferDurationForUnmetConstraints(sc)
+
@volatile var appId: String = _
override def start() {
@@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
- // Fail-fast on offers we know will be rejected
- val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
+ // Fail first on offers with unmet constraints
+ val (offersMatchingConstraints, offersNotMatchingConstraints) =
+ offers.asScala.partition { o =>
+ val offerAttributes = toAttributeMap(o.getAttributesList)
+ val meetsConstraints =
+ matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+
+ // add some debug messaging
+ if (!meetsConstraints) {
+ val id = o.getId.getValue
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes")
+ }
+
+ meetsConstraints
+ }
+
+ // These offers do not meet constraints. We don't need to see them again.
+ // Decline the offer for a long period of time.
+ offersNotMatchingConstraints.foreach { o =>
+ d.declineOffer(o.getId, Filters.newBuilder()
+ .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+ }
+
+ // Of the matching constraints, see which ones give us enough memory and cores
+ val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)
- // check if all constraints are satisfield
- // 1. Attribute constraints
- // 2. Memory requirements
- // 3. CPU requirements - need at least 1 for executor, 1 for task
- val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ // check offers for
+ // 1. Memory requirements
+ // 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
-
val meetsRequirements =
- (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
+ (meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
-
- // add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
- val id = o.getId.getValue
- logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ + s"$offerAttributes mem: $mem cpu: $cpus")
meetsRequirements
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 860c8e097b..721861fbbc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}
+ protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
+ sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+ }
+
}