aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJongyoul Lee <jongyoul@gmail.com>2014-11-24 19:14:14 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-24 19:21:02 -0800
commit10e433919a9a3520007099a3876b47f74c046f12 (patch)
treed54f8857053c157bd034bed9c17d6de287b07d7a /core/src/main
parente7b8bf067a2606e381f2081db95d9c613391afef (diff)
downloadspark-10e433919a9a3520007099a3876b47f74c046f12.tar.gz
spark-10e433919a9a3520007099a3876b47f74c046f12.tar.bz2
spark-10e433919a9a3520007099a3876b47f74c046f12.zip
[SPARK-4525] Mesos should decline unused offers
Functionally, this is just a small change on top of #3393 (by jongyoul). The issue being addressed is discussed in the comments there. I have not yet added a test for the bug there. I will add one shortly. I've also done some minor renaming/clean-up of variables in this class and tests. Author: Patrick Wendell <pwendell@gmail.com> Author: Jongyoul Lee <jongyoul@gmail.com> Closes #3436 from pwendell/mesos-issue and squashes the following commits: 58c35b5 [Patrick Wendell] Adding unit test for this situation c4f0697 [Patrick Wendell] Additional clean-up and fixes on top of existing fix f20f1b3 [Jongyoul Lee] [SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused offers from acceptedOffers - Added code for declining unused offers among acceptedOffers - Edited testCase for checking declining unused offers (cherry picked from commit b043c27424d05e3200e7ba99a1a65656b57fa2f0) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d13795186c..10e6886c16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -208,10 +208,12 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
- val (acceptedOffers, declinedOffers) = offers.partition { o =>
+ // Fail-fast on offers we know will be rejected
+ val (usableOffers, unUsableOffers) = offers.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
+ // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= 2 * scheduler.CPUS_PER_TASK) ||
@@ -219,11 +221,12 @@ private[spark] class MesosSchedulerBackend(
cpus >= scheduler.CPUS_PER_TASK)
}
- val offerableWorkers = acceptedOffers.map { o =>
+ val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the executor doesn't exist yet, subtract CPU for executor
+ // TODO(pwendell): Should below just subtract "1"?
getResource(o.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK
}
@@ -233,17 +236,20 @@ private[spark] class MesosSchedulerBackend(
cpus)
}
- val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap
+ val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
+ val slavesIdsOfAcceptedOffers = HashSet[String]()
+
// Call into the TaskSchedulerImpl
- scheduler.resourceOffers(offerableWorkers)
- .filter(!_.isEmpty)
+ val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
+ acceptedOffers
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
+ slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
@@ -257,7 +263,14 @@ private[spark] class MesosSchedulerBackend(
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
- declinedOffers.foreach(o => d.declineOffer(o.getId))
+ // Decline offers that weren't used
+ // NOTE: This logic assumes that we only get a single offer for each host in a given batch
+ for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
+ d.declineOffer(o.getId)
+ }
+
+ // Decline offers we ruled out immediately
+ unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}