aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-11-24 19:14:14 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-24 19:14:14 -0800
commitb043c27424d05e3200e7ba99a1a65656b57fa2f0 (patch)
treea2195b564acf2d8ff6e342ab6fed5a8fdee0e064 /core
parentd24d5bf064572a2319627736b1fbf112b4a78edf (diff)
downloadspark-b043c27424d05e3200e7ba99a1a65656b57fa2f0.tar.gz
spark-b043c27424d05e3200e7ba99a1a65656b57fa2f0.tar.bz2
spark-b043c27424d05e3200e7ba99a1a65656b57fa2f0.zip
[SPARK-4525] Mesos should decline unused offers
Functionally, this is just a small change on top of #3393 (by jongyoul). The issue being addressed is discussed in the comments there. I have not yet added a test for the bug there. I will add one shortly. I've also done some minor renaming/clean-up of variables in this class and tests. Author: Patrick Wendell <pwendell@gmail.com> Author: Jongyoul Lee <jongyoul@gmail.com> Closes #3436 from pwendell/mesos-issue and squashes the following commits: 58c35b5 [Patrick Wendell] Adding unit test for this situation c4f0697 [Patrick Wendell] Additional clean-up and fixes on top of existing fix f20f1b3 [Jongyoul Lee] [SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused offers from acceptedOffers - Added code for declining unused offers among acceptedOffers - Edited testCase for checking declining unused offers
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala61
2 files changed, 65 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d13795186c..10e6886c16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -208,10 +208,12 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
- val (acceptedOffers, declinedOffers) = offers.partition { o =>
+ // Fail-fast on offers we know will be rejected
+ val (usableOffers, unUsableOffers) = offers.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
+ // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= 2 * scheduler.CPUS_PER_TASK) ||
@@ -219,11 +221,12 @@ private[spark] class MesosSchedulerBackend(
cpus >= scheduler.CPUS_PER_TASK)
}
- val offerableWorkers = acceptedOffers.map { o =>
+ val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the executor doesn't exist yet, subtract CPU for executor
+ // TODO(pwendell): Should below just subtract "1"?
getResource(o.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK
}
@@ -233,17 +236,20 @@ private[spark] class MesosSchedulerBackend(
cpus)
}
- val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap
+ val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
+ val slavesIdsOfAcceptedOffers = HashSet[String]()
+
// Call into the TaskSchedulerImpl
- scheduler.resourceOffers(offerableWorkers)
- .filter(!_.isEmpty)
+ val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
+ acceptedOffers
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
+ slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
@@ -257,7 +263,14 @@ private[spark] class MesosSchedulerBackend(
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
- declinedOffers.foreach(o => d.declineOffer(o.getId))
+ // Decline offers that weren't used
+ // NOTE: This logic assumes that we only get a single offer for each host in a given batch
+ for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
+ d.declineOffer(o.getId)
+ }
+
+ // Decline offers we ruled out immediately
+ unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index bef8d3a58b..e60e70afd3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -30,9 +30,11 @@ import java.nio.ByteBuffer
import java.util.Collections
import java.util
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
- test("mesos resource offer is launching tasks") {
+
+ test("mesos resource offers result in launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int) = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
@@ -43,46 +45,61 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build()
+ builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
}
val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val sc = EasyMock.createMock(classOf[SparkContext])
-
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
EasyMock.replay(sc)
+
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4
- val offers = new java.util.ArrayList[Offer]
- offers.add(createOffer(1, minMem, minCpu))
- offers.add(createOffer(1, minMem - 1, minCpu))
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(createOffer(1, minMem, minCpu))
+ mesosOffers.add(createOffer(2, minMem - 1, minCpu))
+ mesosOffers.add(createOffer(3, minMem, minCpu))
+
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
- val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer(
- o.getSlaveId.getValue,
- o.getHostname,
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2
+ ))
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(2).getSlaveId.getValue,
+ mesosOffers.get(2).getHostname,
2
))
val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
- EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc)))
+ EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
+
val capture = new Capture[util.Collection[TaskInfo]]
EasyMock.expect(
driver.launchTasks(
- EasyMock.eq(Collections.singleton(offers.get(0).getId)),
+ EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
EasyMock.capture(capture),
EasyMock.anyObject(classOf[Filters])
)
- ).andReturn(Status.valueOf(1))
- EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1))
+ ).andReturn(Status.valueOf(1)).once
+ EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
+ EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
- backend.resourceOffers(driver, offers)
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ EasyMock.verify(driver)
assert(capture.getValue.size() == 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
@@ -90,5 +107,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
assert(cpus.getName.equals("cpus"))
assert(cpus.getScalar.getValue.equals(2.0))
assert(taskInfo.getSlaveId.getValue.equals("s1"))
+
+ // Unwanted resources offered on an existing node. Make sure they are declined
+ val mesosOffers2 = new java.util.ArrayList[Offer]
+ mesosOffers2.add(createOffer(1, minMem, minCpu))
+ EasyMock.reset(taskScheduler)
+ EasyMock.reset(driver)
+ EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
+ EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
+ EasyMock.replay(taskScheduler)
+ EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
+ EasyMock.replay(driver)
+
+ backend.resourceOffers(driver, mesosOffers2)
+ EasyMock.verify(driver)
}
}