aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala92
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala4
3 files changed, 91 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d10a77f8e5..2de9b6a651 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+ // reject offers with mismatched constraints in seconds
+ private val rejectOfferDurationForUnmetConstraints =
+ getRejectOfferDurationForUnmetConstraints(sc)
+
// A client for talking to the external shuffle service, if it is a
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
@@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
- if (taskIdToSlaveId.size < executorLimit &&
- totalCoresAcquired < maxCores &&
- meetsConstraints &&
- mem >= calculateTotalMemory(sc) &&
- cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- totalCoresAcquired += cpusToUse
- val taskId = newMesosTaskId()
- taskIdToSlaveId.put(taskId, slaveId)
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- // Gather cpu resources from the available resources and use them in the task.
- val (remainingResources, cpuResourcesToUse) =
- partitionResources(offer.getResourcesList, "cpus", cpusToUse)
- val (_, memResourcesToUse) =
- partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
- val taskBuilder = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
- .setName("Task " + taskId)
- .addAllResources(cpuResourcesToUse.asJava)
- .addAllResources(memResourcesToUse.asJava)
-
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
+ if (meetsConstraints) {
+ if (taskIdToSlaveId.size < executorLimit &&
+ totalCoresAcquired < maxCores &&
+ mem >= calculateTotalMemory(sc) &&
+ cpus >= 1 &&
+ failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+ !slaveIdsWithExecutors.contains(slaveId)) {
+ // Launch an executor on the slave
+ val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ totalCoresAcquired += cpusToUse
+ val taskId = newMesosTaskId()
+ taskIdToSlaveId.put(taskId, slaveId)
+ slaveIdsWithExecutors += slaveId
+ coresByTaskId(taskId) = cpusToUse
+ // Gather cpu resources from the available resources and use them in the task.
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+ val (_, memResourcesToUse) =
+ partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
+ val taskBuilder = MesosTaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+ .setSlaveId(offer.getSlaveId)
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
+ .setName("Task " + taskId)
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
+
+ sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
+ }
+
+ // Accept the offer and launch the task
+ logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
+ d.launchTasks(
+ Collections.singleton(offer.getId),
+ Collections.singleton(taskBuilder.build()), filters)
+ } else {
+ // Decline the offer
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ d.declineOffer(offer.getId)
}
-
- // accept the offer and launch the task
- logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
- d.launchTasks(
- Collections.singleton(offer.getId),
- Collections.singleton(taskBuilder.build()), filters)
} else {
- // Decline the offer
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
- d.declineOffer(offer.getId)
+ // This offer does not meet constraints. We don't need to see it again.
+ // Decline the offer for a long period of time.
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ + s" for $rejectOfferDurationForUnmetConstraints seconds")
+ d.declineOffer(offer.getId, Filters.newBuilder()
+ .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index aaffac604a..281965a598 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+ // reject offers with mismatched constraints in seconds
+ private val rejectOfferDurationForUnmetConstraints =
+ getRejectOfferDurationForUnmetConstraints(sc)
+
@volatile var appId: String = _
override def start() {
@@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
- // Fail-fast on offers we know will be rejected
- val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
+ // Fail first on offers with unmet constraints
+ val (offersMatchingConstraints, offersNotMatchingConstraints) =
+ offers.asScala.partition { o =>
+ val offerAttributes = toAttributeMap(o.getAttributesList)
+ val meetsConstraints =
+ matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+
+ // add some debug messaging
+ if (!meetsConstraints) {
+ val id = o.getId.getValue
+ logDebug(s"Declining offer: $id with attributes: $offerAttributes")
+ }
+
+ meetsConstraints
+ }
+
+ // These offers do not meet constraints. We don't need to see them again.
+ // Decline the offer for a long period of time.
+ offersNotMatchingConstraints.foreach { o =>
+ d.declineOffer(o.getId, Filters.newBuilder()
+ .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+ }
+
+ // Of the matching constraints, see which ones give us enough memory and cores
+ val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)
- // check if all constraints are satisfield
- // 1. Attribute constraints
- // 2. Memory requirements
- // 3. CPU requirements - need at least 1 for executor, 1 for task
- val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+ // check offers for
+ // 1. Memory requirements
+ // 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
-
val meetsRequirements =
- (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
+ (meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
-
- // add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
- val id = o.getId.getValue
- logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ + s"$offerAttributes mem: $mem cpu: $cpus")
meetsRequirements
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 860c8e097b..721861fbbc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}
+ protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
+ sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+ }
+
}