aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastien Rainville <sebastien@hopper.com>2016-05-04 14:32:36 -0700
committerAndrew Or <andrew@databricks.com>2016-05-04 14:32:36 -0700
commiteb019af9a9cadb127eab1b6d30312169ed90f808 (patch)
tree1fb16c735594776ff511304160c705b9fd7cee97
parentcdce4e62a5674e2034e5d395578b1a60e3d8c435 (diff)
downloadspark-eb019af9a9cadb127eab1b6d30312169ed90f808.tar.gz
spark-eb019af9a9cadb127eab1b6d30312169ed90f808.tar.bz2
spark-eb019af9a9cadb127eab1b6d30312169ed90f808.zip
[SPARK-13001][CORE][MESOS] Prevent getting offers when reached max cores
Similar to https://github.com/apache/spark/pull/8639 This change rejects offers for 120s when reached `spark.cores.max` in coarse-grained mode to mitigate offer starvation. This prevents Mesos to send us offers again and again, starving other frameworks. This is especially problematic when running many small frameworks on the same Mesos cluster, e.g. many small Sparks streaming jobs, and cause the bigger spark jobs to stop receiving offers. By rejecting the offers for a long period of time, they become available to those other frameworks. Author: Sebastien Rainville <sebastien@hopper.com> Closes #10924 from sebastienrainville/master.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala13
3 files changed, 53 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 50b452c72f..2c5be1f528 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
- // reject offers with mismatched constraints in seconds
+ // Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
+ // Reject offers when we reached the maximum number of cores for this framework
+ private val rejectOfferDurationForReachedMaxCores =
+ getRejectOfferDurationForReachedMaxCores(sc)
+
// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
@@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend(
}
private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
- for (offer <- offers) {
- val id = offer.getId.getValue
- val offerAttributes = toAttributeMap(offer.getAttributesList)
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus")
- val filters = Filters.newBuilder()
- .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
-
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
- + s" for $rejectOfferDurationForUnmetConstraints seconds")
+ offers.foreach { offer =>
+ declineOffer(d, offer, Some("unmet constraints"),
+ Some(rejectOfferDurationForUnmetConstraints))
+ }
+ }
- d.declineOffer(offer.getId, filters)
+ private def declineOffer(
+ d: SchedulerDriver,
+ offer: Offer,
+ reason: Option[String] = None,
+ refuseSeconds: Option[Long] = None): Unit = {
+
+ val id = offer.getId.getValue
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus")
+
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
+ s" cpu: $cpus for $refuseSeconds seconds" +
+ reason.map(r => s" (reason: $r)").getOrElse(""))
+
+ refuseSeconds match {
+ case Some(seconds) =>
+ val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+ d.declineOffer(offer.getId, filters)
+ case _ => d.declineOffer(offer.getId)
}
}
@@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend(
d.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
- } else { // decline
- logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
- s"mem: $offerMem cpu: $offerCpus")
-
- d.declineOffer(offer.getId)
+ } else if (totalCoresAcquired >= maxCores) {
+ // Reject an offer for a configurable amount of time to avoid starving other frameworks
+ declineOffer(d, offer, Some("reached spark.cores.max"),
+ Some(rejectOfferDurationForReachedMaxCores))
+ } else {
+ declineOffer(d, offer)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1e322ac679..7355ba317d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}
+ protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
+ sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index b18f0eb162..15d59e7052 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}
+ test("mesos declines offers with a filter when reached spark.cores.max") {
+ val maxCores = 3
+ setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(
+ (executorMemory, maxCores + 1),
+ (executorMemory, maxCores + 1)))
+
+ verifyTaskLaunched("o1")
+ verifyDeclinedOffer(driver, createOfferId("o2"), true)
+ }
+
test("mesos assigns tasks round-robin on offers") {
val executorCores = 4
val maxCores = executorCores * 2