aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala82
1 files changed, 19 insertions, 63 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 82f652dae0..3412301e64 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,20 +18,17 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.{List => JList}
-import java.util.Collections
+import java.util.{Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -49,17 +46,10 @@ private[spark] class CoarseMesosSchedulerBackend(
master: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with MScheduler
- with Logging {
+ with MesosSchedulerUtils {
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
@@ -87,26 +77,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def start() {
super.start()
-
- synchronized {
- new Thread("CoarseMesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try { {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- }
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
+ val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
+ startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
@@ -150,8 +122,10 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
- val uri = conf.get("spark.executor.uri", null)
- if (uri == null) {
+ val uri = conf.getOption("spark.executor.uri")
+ .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
+ if (uri.isEmpty) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
@@ -164,7 +138,7 @@ private[spark] class CoarseMesosSchedulerBackend(
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
+ val basename = uri.get.split('/').last.split('.').head
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
@@ -173,7 +147,7 @@ private[spark] class CoarseMesosSchedulerBackend(
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
command.build()
}
@@ -183,18 +157,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
+ markRegistered()
}
override def disconnected(d: SchedulerDriver) {}
@@ -245,14 +208,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- private def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- 0
- }
-
/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
@@ -284,7 +239,8 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+ // In case we'd rejected everything before but have now lost a node
+ mesosDriver.reviveOffers()
}
}
}
@@ -296,8 +252,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def stop() {
super.stop()
- if (driver != null) {
- driver.stop()
+ if (mesosDriver != null) {
+ mesosDriver.stop()
}
}