aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala13
3 files changed, 53 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 50b452c72f..2c5be1f528 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
- // reject offers with mismatched constraints in seconds
+ // Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
+ // Reject offers when we reached the maximum number of cores for this framework
+ private val rejectOfferDurationForReachedMaxCores =
+ getRejectOfferDurationForReachedMaxCores(sc)
+
// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
@@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend(
}
private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
- for (offer <- offers) {
- val id = offer.getId.getValue
- val offerAttributes = toAttributeMap(offer.getAttributesList)
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus")
- val filters = Filters.newBuilder()
- .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
-
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
- + s" for $rejectOfferDurationForUnmetConstraints seconds")
+ offers.foreach { offer =>
+ declineOffer(d, offer, Some("unmet constraints"),
+ Some(rejectOfferDurationForUnmetConstraints))
+ }
+ }
- d.declineOffer(offer.getId, filters)
+ private def declineOffer(
+ d: SchedulerDriver,
+ offer: Offer,
+ reason: Option[String] = None,
+ refuseSeconds: Option[Long] = None): Unit = {
+
+ val id = offer.getId.getValue
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus")
+
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
+ s" cpu: $cpus for $refuseSeconds seconds" +
+ reason.map(r => s" (reason: $r)").getOrElse(""))
+
+ refuseSeconds match {
+ case Some(seconds) =>
+ val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+ d.declineOffer(offer.getId, filters)
+ case _ => d.declineOffer(offer.getId)
}
}
@@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend(
d.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
- } else { // decline
- logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
- s"mem: $offerMem cpu: $offerCpus")
-
- d.declineOffer(offer.getId)
+ } else if (totalCoresAcquired >= maxCores) {
+ // Reject an offer for a configurable amount of time to avoid starving other frameworks
+ declineOffer(d, offer, Some("reached spark.cores.max"),
+ Some(rejectOfferDurationForReachedMaxCores))
+ } else {
+ declineOffer(d, offer)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1e322ac679..7355ba317d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}
+ protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
+ sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index b18f0eb162..15d59e7052 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}
+ test("mesos declines offers with a filter when reached spark.cores.max") {
+ val maxCores = 3
+ setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(
+ (executorMemory, maxCores + 1),
+ (executorMemory, maxCores + 1)))
+
+ verifyTaskLaunched("o1")
+ verifyDeclinedOffer(driver, createOfferId("o2"), true)
+ }
+
test("mesos assigns tasks round-robin on offers") {
val executorCores = 4
val maxCores = executorCores * 2