aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala55
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala139
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala74
3 files changed, 170 insertions, 98 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8cda4ff0eb..2df7b1120b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler(
val appJar = CommandInfo.URI.newBuilder()
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
val builder = CommandInfo.newBuilder().addUris(appJar)
- val entries =
- (conf.getOption("spark.executor.extraLibraryPath").toList ++
- desc.command.libraryPathEntries)
+ val entries = conf.getOption("spark.executor.extraLibraryPath")
+ .map(path => Seq(path) ++ desc.command.libraryPathEntries)
+ .getOrElse(desc.command.libraryPathEntries)
+
val prefixEnv = if (!entries.isEmpty) {
Utils.libraryPathEnvPrefix(entries)
} else {
@@ -442,9 +443,12 @@ private[spark] class MesosClusterScheduler(
options
}
- private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
+ private class ResourceOffer(
+ val offerId: OfferID,
+ val slaveId: SlaveID,
+ var resources: JList[Resource]) {
override def toString(): String = {
- s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
+ s"Offer id: ${offerId}, resources: ${resources}"
}
}
@@ -463,27 +467,29 @@ private[spark] class MesosClusterScheduler(
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
- o.cpu >= driverCpu && o.mem >= driverMem
+ getResource(o.resources, "cpus") >= driverCpu &&
+ getResource(o.resources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
- offer.cpu -= driverCpu
- offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
- val cpuResource = createResource("cpus", driverCpu)
- val memResource = createResource("mem", driverMem)
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.resources, "cpus", driverCpu)
+ val (finalResources, memResourcesToUse) =
+ partitionResources(remainingResources.asJava, "mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
- .setSlaveId(offer.offer.getSlaveId)
+ .setSlaveId(offer.slaveId)
.setCommand(commandInfo)
- .addResources(cpuResource)
- .addResources(memResource)
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
+ offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
@@ -496,11 +502,11 @@ private[spark] class MesosClusterScheduler(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
}
- val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
+ val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
- logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
+ logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
+ val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
@@ -510,14 +516,14 @@ private[spark] class MesosClusterScheduler(
}
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
- val currentOffers = offers.asScala.map(o =>
- new ResourceOffer(
- o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
- ).toList
- logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
+ logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date()
+ val currentOffers = offers.asScala.map {
+ o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
+ }.toList
+
stateLock.synchronized {
// We first schedule all the supervised drivers that are ready to retry.
// This list will be empty if none of the drivers are marked as supervise.
@@ -541,9 +547,10 @@ private[spark] class MesosClusterScheduler(
tasks.foreach { case (offerId, taskInfos) =>
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}
- offers.asScala
- .filter(o => !tasks.keySet.contains(o.getId))
- .foreach(o => driver.declineOffer(o.getId))
+
+ for (o <- currentOffers if !tasks.contains(o.offerId)) {
+ driver.declineOffer(o.offerId)
+ }
}
private def copyBuffer(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000000..dbef6868f2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.{Collection, Collections, Date}
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Scalar, Type}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+
+
+class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
+ private var scheduler: MesosClusterScheduler = _
+
+ override def beforeEach(): Unit = {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ }
+
+ test("can queue drivers") {
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val response2 =
+ scheduler.submitDriver(new MesosDriverDescription(
+ "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+ assert(response2.success)
+ val state = scheduler.getSchedulerState()
+ val queuedDrivers = state.queuedDrivers.toList
+ assert(queuedDrivers(0).submissionId == response.submissionId)
+ assert(queuedDrivers(1).submissionId == response2.submissionId)
+ }
+
+ test("can kill queued drivers") {
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+ val state = scheduler.getSchedulerState()
+ assert(state.queuedDrivers.isEmpty)
+ }
+
+ test("can handle multiple roles") {
+ val driver = mock[SchedulerDriver]
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
+ command,
+ Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
+ "s1",
+ new Date()))
+ assert(response.success)
+ val offer = Offer.newBuilder()
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1000).build())
+ .setName("mem")
+ .setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("role2")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("role2")
+ .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
+ .setId(OfferID.newBuilder().setValue("o1").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+ .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
+ .setHostname("host1")
+ .build()
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture())
+ ).thenReturn(Status.valueOf(1))
+
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+
+ val taskInfos = capture.getValue
+ assert(taskInfos.size() == 1)
+ val taskInfo = taskInfos.iterator().next()
+ val resources = taskInfo.getResourcesList
+ assert(scheduler.getResource(resources, "cpus") == 1.5)
+ assert(scheduler.getResource(resources, "mem") == 1200)
+ val resourcesSeq: Seq[Resource] = resources.asScala
+ val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
+ assert(cpus.size == 2)
+ assert(cpus.exists(_.getRole().equals("role2")))
+ assert(cpus.exists(_.getRole().equals("*")))
+ val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
+ assert(mem.size == 2)
+ assert(mem.exists(_.getRole().equals("role2")))
+ assert(mem.exists(_.getRole().equals("*")))
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture()
+ )
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
deleted file mode 100644
index 98fdc58786..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import java.util.Date
-
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.Command
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import org.apache.spark.scheduler.cluster.mesos._
-
-class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
-
- private val command = new Command("mainClass", Seq("arg"), null, null, null, null)
-
- test("can queue drivers") {
- val conf = new SparkConf()
- conf.setMaster("mesos://localhost:5050")
- conf.setAppName("spark mesos")
- val scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = { ready = true }
- }
- scheduler.start()
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
- assert(response.success)
- val response2 =
- scheduler.submitDriver(new MesosDriverDescription(
- "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
- assert(response2.success)
- val state = scheduler.getSchedulerState()
- val queuedDrivers = state.queuedDrivers.toList
- assert(queuedDrivers(0).submissionId == response.submissionId)
- assert(queuedDrivers(1).submissionId == response2.submissionId)
- }
-
- test("can kill queued drivers") {
- val conf = new SparkConf()
- conf.setMaster("mesos://localhost:5050")
- conf.setAppName("spark mesos")
- val scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = { ready = true }
- }
- scheduler.start()
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
- assert(response.success)
- val killResponse = scheduler.killDriver(response.submissionId)
- assert(killResponse.success)
- val state = scheduler.getSchedulerState()
- assert(state.queuedDrivers.isEmpty)
- }
-}