aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2016-02-22 11:11:33 -0800
committerAndrew Or <andrew@databricks.com>2016-02-22 11:11:33 -0800
commit00461bb911c31aff9c945a14e23df2af4c280c23 (patch)
tree43b30bb2e9703c58a1b5bc84d3dbaa1048a18a95 /core
parent40e6d40fe79ce45d511e049133d2f30a2963740b (diff)
downloadspark-00461bb911c31aff9c945a14e23df2af4c280c23.tar.gz
spark-00461bb911c31aff9c945a14e23df2af4c280c23.tar.bz2
spark-00461bb911c31aff9c945a14e23df2af4c280c23.zip
[SPARK-10749][MESOS] Support multiple roles with mesos cluster mode.
Currently the Mesos cluster dispatcher is not using offers from multiple roles correctly, as it simply aggregates all the offers resource values into one, but doesn't apply them correctly before calling the driver as Mesos needs the resources from the offers to be specified which role it originally belongs to. Multiple roles is already supported with fine/coarse grain scheduler, so porting that logic here to the cluster scheduler. https://issues.apache.org/jira/browse/SPARK-10749 Author: Timothy Chen <tnachen@gmail.com> Closes #8872 from tnachen/cluster_multi_roles.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala55
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala139
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala74
3 files changed, 170 insertions, 98 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8cda4ff0eb..2df7b1120b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler(
val appJar = CommandInfo.URI.newBuilder()
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
val builder = CommandInfo.newBuilder().addUris(appJar)
- val entries =
- (conf.getOption("spark.executor.extraLibraryPath").toList ++
- desc.command.libraryPathEntries)
+ val entries = conf.getOption("spark.executor.extraLibraryPath")
+ .map(path => Seq(path) ++ desc.command.libraryPathEntries)
+ .getOrElse(desc.command.libraryPathEntries)
+
val prefixEnv = if (!entries.isEmpty) {
Utils.libraryPathEnvPrefix(entries)
} else {
@@ -442,9 +443,12 @@ private[spark] class MesosClusterScheduler(
options
}
- private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
+ private class ResourceOffer(
+ val offerId: OfferID,
+ val slaveId: SlaveID,
+ var resources: JList[Resource]) {
override def toString(): String = {
- s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
+ s"Offer id: ${offerId}, resources: ${resources}"
}
}
@@ -463,27 +467,29 @@ private[spark] class MesosClusterScheduler(
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
- o.cpu >= driverCpu && o.mem >= driverMem
+ getResource(o.resources, "cpus") >= driverCpu &&
+ getResource(o.resources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
- offer.cpu -= driverCpu
- offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
- val cpuResource = createResource("cpus", driverCpu)
- val memResource = createResource("mem", driverMem)
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.resources, "cpus", driverCpu)
+ val (finalResources, memResourcesToUse) =
+ partitionResources(remainingResources.asJava, "mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
- .setSlaveId(offer.offer.getSlaveId)
+ .setSlaveId(offer.slaveId)
.setCommand(commandInfo)
- .addResources(cpuResource)
- .addResources(memResource)
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
+ offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
@@ -496,11 +502,11 @@ private[spark] class MesosClusterScheduler(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
}
- val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
+ val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
- logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
+ logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
+ val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
@@ -510,14 +516,14 @@ private[spark] class MesosClusterScheduler(
}
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
- val currentOffers = offers.asScala.map(o =>
- new ResourceOffer(
- o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
- ).toList
- logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
+ logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date()
+ val currentOffers = offers.asScala.map {
+ o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
+ }.toList
+
stateLock.synchronized {
// We first schedule all the supervised drivers that are ready to retry.
// This list will be empty if none of the drivers are marked as supervise.
@@ -541,9 +547,10 @@ private[spark] class MesosClusterScheduler(
tasks.foreach { case (offerId, taskInfos) =>
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}
- offers.asScala
- .filter(o => !tasks.keySet.contains(o.getId))
- .foreach(o => driver.declineOffer(o.getId))
+
+ for (o <- currentOffers if !tasks.contains(o.offerId)) {
+ driver.declineOffer(o.offerId)
+ }
}
private def copyBuffer(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000000..dbef6868f2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.{Collection, Collections, Date}
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Scalar, Type}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+
+
+class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
+ private var scheduler: MesosClusterScheduler = _
+
+ override def beforeEach(): Unit = {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ }
+
+ test("can queue drivers") {
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val response2 =
+ scheduler.submitDriver(new MesosDriverDescription(
+ "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+ assert(response2.success)
+ val state = scheduler.getSchedulerState()
+ val queuedDrivers = state.queuedDrivers.toList
+ assert(queuedDrivers(0).submissionId == response.submissionId)
+ assert(queuedDrivers(1).submissionId == response2.submissionId)
+ }
+
+ test("can kill queued drivers") {
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+ val state = scheduler.getSchedulerState()
+ assert(state.queuedDrivers.isEmpty)
+ }
+
+ test("can handle multiple roles") {
+ val driver = mock[SchedulerDriver]
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
+ command,
+ Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
+ "s1",
+ new Date()))
+ assert(response.success)
+ val offer = Offer.newBuilder()
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1000).build())
+ .setName("mem")
+ .setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("role2")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("role2")
+ .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
+ .setId(OfferID.newBuilder().setValue("o1").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+ .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
+ .setHostname("host1")
+ .build()
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture())
+ ).thenReturn(Status.valueOf(1))
+
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+
+ val taskInfos = capture.getValue
+ assert(taskInfos.size() == 1)
+ val taskInfo = taskInfos.iterator().next()
+ val resources = taskInfo.getResourcesList
+ assert(scheduler.getResource(resources, "cpus") == 1.5)
+ assert(scheduler.getResource(resources, "mem") == 1200)
+ val resourcesSeq: Seq[Resource] = resources.asScala
+ val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
+ assert(cpus.size == 2)
+ assert(cpus.exists(_.getRole().equals("role2")))
+ assert(cpus.exists(_.getRole().equals("*")))
+ val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
+ assert(mem.size == 2)
+ assert(mem.exists(_.getRole().equals("role2")))
+ assert(mem.exists(_.getRole().equals("*")))
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture()
+ )
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
deleted file mode 100644
index 98fdc58786..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.mesos
-
-import java.util.Date
-
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.Command
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import org.apache.spark.scheduler.cluster.mesos._
-
-class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
-
- private val command = new Command("mainClass", Seq("arg"), null, null, null, null)
-
- test("can queue drivers") {
- val conf = new SparkConf()
- conf.setMaster("mesos://localhost:5050")
- conf.setAppName("spark mesos")
- val scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = { ready = true }
- }
- scheduler.start()
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
- assert(response.success)
- val response2 =
- scheduler.submitDriver(new MesosDriverDescription(
- "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
- assert(response2.success)
- val state = scheduler.getSchedulerState()
- val queuedDrivers = state.queuedDrivers.toList
- assert(queuedDrivers(0).submissionId == response.submissionId)
- assert(queuedDrivers(1).submissionId == response2.submissionId)
- }
-
- test("can kill queued drivers") {
- val conf = new SparkConf()
- conf.setMaster("mesos://localhost:5050")
- conf.setAppName("spark mesos")
- val scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = { ready = true }
- }
- scheduler.start()
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
- assert(response.success)
- val killResponse = scheduler.killDriver(response.submissionId)
- assert(killResponse.success)
- val state = scheduler.getSchedulerState()
- assert(state.queuedDrivers.isEmpty)
- }
-}