diff options
author | Jongyoul Lee <jongyoul@gmail.com> | 2014-11-24 19:14:14 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-11-24 19:21:02 -0800 |
commit | 10e433919a9a3520007099a3876b47f74c046f12 (patch) | |
tree | d54f8857053c157bd034bed9c17d6de287b07d7a /core/src/main | |
parent | e7b8bf067a2606e381f2081db95d9c613391afef (diff) | |
download | spark-10e433919a9a3520007099a3876b47f74c046f12.tar.gz spark-10e433919a9a3520007099a3876b47f74c046f12.tar.bz2 spark-10e433919a9a3520007099a3876b47f74c046f12.zip |
[SPARK-4525] Mesos should decline unused offers
Functionally, this is just a small change on top of #3393 (by jongyoul). The issue being addressed is discussed in the comments there. I have not yet added a test for the bug there. I will add one shortly.
I've also done some minor renaming/clean-up of variables in this class and tests.
Author: Patrick Wendell <pwendell@gmail.com>
Author: Jongyoul Lee <jongyoul@gmail.com>
Closes #3436 from pwendell/mesos-issue and squashes the following commits:
58c35b5 [Patrick Wendell] Adding unit test for this situation
c4f0697 [Patrick Wendell] Additional clean-up and fixes on top of existing fix
f20f1b3 [Jongyoul Lee] [SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused offers from acceptedOffers - Added code for declining unused offers among acceptedOffers - Edited testCase for checking declining unused offers
(cherry picked from commit b043c27424d05e3200e7ba99a1a65656b57fa2f0)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d13795186c..10e6886c16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -208,10 +208,12 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { - val (acceptedOffers, declinedOffers) = offers.partition { o => + // Fail-fast on offers we know will be rejected + val (usableOffers, unUsableOffers) = offers.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue + // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task cpus >= 2 * scheduler.CPUS_PER_TASK) || @@ -219,11 +221,12 @@ private[spark] class MesosSchedulerBackend( cpus >= scheduler.CPUS_PER_TASK) } - val offerableWorkers = acceptedOffers.map { o => + val workerOffers = usableOffers.map { o => val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { getResource(o.getResourcesList, "cpus").toInt } else { // If the executor doesn't exist yet, subtract CPU for executor + // TODO(pwendell): Should below just subtract "1"? getResource(o.getResourcesList, "cpus").toInt - scheduler.CPUS_PER_TASK } @@ -233,17 +236,20 @@ private[spark] class MesosSchedulerBackend( cpus) } - val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap + val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] + val slavesIdsOfAcceptedOffers = HashSet[String]() + // Call into the TaskSchedulerImpl - scheduler.resourceOffers(offerableWorkers) - .filter(!_.isEmpty) + val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) + acceptedOffers .foreach { offer => offer.foreach { taskDesc => val slaveId = taskDesc.executorId slaveIdsWithExecutors += slaveId + slavesIdsOfAcceptedOffers += slaveId taskIdToSlaveId(taskDesc.taskId) = slaveId mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) .add(createMesosTask(taskDesc, slaveId)) @@ -257,7 +263,14 @@ private[spark] class MesosSchedulerBackend( d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } - declinedOffers.foreach(o => d.declineOffer(o.getId)) + // Decline offers that weren't used + // NOTE: This logic assumes that we only get a single offer for each host in a given batch + for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) { + d.declineOffer(o.getId) + } + + // Decline offers we ruled out immediately + unUsableOffers.foreach(o => d.declineOffer(o.getId)) } } |