aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-08 10:52:13 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-08 10:52:13 -0700
commit1aa63f775b9ecfa5225449de5cac8427c0e90d54 (patch)
tree46ff0bb4af838d1428809da2a83c3973d4edf0b8
parentc5cc10cda3c0e86773c67684eaa4876ab515fdb7 (diff)
downloadspark-1aa63f775b9ecfa5225449de5cac8427c0e90d54.tar.gz
spark-1aa63f775b9ecfa5225449de5cac8427c0e90d54.tar.bz2
spark-1aa63f775b9ecfa5225449de5cac8427c0e90d54.zip
Added back coarse-grained Mesos scheduler based on StandaloneScheduler.
-rw-r--r--core/src/main/scala/spark/SparkContext.scala22
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala52
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala371
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala251
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/spark/util/SerializableBuffer.scala2
11 files changed, 320 insertions, 416 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 8a06642426..22e1d52f65 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
-import org.apache.mesos.MesosNativeLibrary
+import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
@@ -41,8 +41,8 @@ import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.ClusterScheduler
-import spark.scheduler.mesos.MesosSchedulerBackend
+import spark.scheduler.cluster.{SchedulerBackend, ClusterScheduler}
+import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.BlockManagerMaster
class SparkContext(
@@ -89,17 +89,15 @@ class SparkContext(
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
MesosNativeLibrary.load()
- val sched = new ClusterScheduler(this)
- val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName)
- sched.initialize(schedContext)
- sched
- /*
- if (System.getProperty("spark.mesos.coarse", "false") == "true") {
- new CoarseMesosScheduler(this, master, frameworkName)
+ val scheduler = new ClusterScheduler(this)
+ val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+ val backend = if (coarseGrained) {
+ new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName)
} else {
- new MesosSchedulerBackend(this, master, frameworkName)
+ new MesosSchedulerBackend(scheduler, this, master, frameworkName)
}
- */
+ scheduler.initialize(backend)
+ scheduler
}
}
taskScheduler.start()
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index b717ed2b77..26b163de0a 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -7,11 +7,11 @@ import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
-import spark.scheduler.standalone._
-import spark.scheduler.standalone.RegisteredSlave
-import spark.scheduler.standalone.LaunchTask
-import spark.scheduler.standalone.RegisterSlaveFailed
-import spark.scheduler.standalone.RegisterSlave
+import spark.scheduler.cluster._
+import spark.scheduler.cluster.RegisteredSlave
+import spark.scheduler.cluster.LaunchTask
+import spark.scheduler.cluster.RegisterSlaveFailed
+import spark.scheduler.cluster.RegisterSlave
class StandaloneExecutorBackend(
@@ -31,6 +31,7 @@ class StandaloneExecutorBackend(
override def preStart() {
try {
+ logInfo("Connecting to master: " + masterUrl)
master = context.actorFor(masterUrl)
master ! RegisterSlave(slaveId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
@@ -51,7 +52,8 @@ class StandaloneExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)
- case LaunchTask(slaveId_, taskDesc) =>
+ case LaunchTask(taskDesc) =>
+ logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 7f1664b483..5b59479682 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -108,7 +108,6 @@ class ClusterScheduler(sc: SparkContext)
}
}
-
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 7f19fe0cc5..80e8733671 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -7,7 +7,7 @@ import spark.util.SerializableBuffer
sealed trait StandaloneClusterMessage extends Serializable
// Master to slaves
-case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
+case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 1acf9e86de..c3132abd7a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -1,6 +1,6 @@
package spark.scheduler.cluster
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor.{Props, Actor, ActorRef, ActorSystem}
import akka.util.duration._
@@ -21,19 +21,24 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
- class MasterActor extends Actor {
+ class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
val slaveActor = new HashMap[String, ActorRef]
val slaveHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int]
def receive = {
case RegisterSlave(slaveId, host, cores) =>
- slaveActor(slaveId) = sender
- logInfo("Registered slave: " + sender + " with ID " + slaveId)
- slaveHost(slaveId) = host
- freeCores(slaveId) = cores
- totalCoreCount.addAndGet(cores)
- makeOffers()
+ if (slaveActor.contains(slaveId)) {
+ sender ! RegisterSlaveFailed("Duplicate slave ID: " + slaveId)
+ } else {
+ logInfo("Registered slave: " + sender + " with ID " + slaveId)
+ sender ! RegisteredSlave(sparkProperties)
+ slaveActor(slaveId) = sender
+ slaveHost(slaveId) = host
+ freeCores(slaveId) = cores
+ totalCoreCount.addAndGet(cores)
+ makeOffers()
+ }
case StatusUpdate(slaveId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
@@ -42,10 +47,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
makeOffers(slaveId)
}
- case LaunchTask(slaveId, task) =>
- freeCores(slaveId) -= 1
- slaveActor(slaveId) ! LaunchTask(slaveId, task)
-
case ReviveOffers =>
makeOffers()
@@ -58,14 +59,22 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Make fake resource offers on all slaves
def makeOffers() {
- scheduler.resourceOffers(
- slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})
+ launchTasks(scheduler.resourceOffers(
+ slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
// Make fake resource offers on just one slave
def makeOffers(slaveId: String) {
- scheduler.resourceOffers(
- Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
+ launchTasks(scheduler.resourceOffers(
+ Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))))
+ }
+
+ // Launch tasks returned by a set of resource offers
+ def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+ for (task <- tasks.flatten) {
+ freeCores(task.slaveId) -= 1
+ slaveActor(task.slaveId) ! LaunchTask(task)
+ }
}
}
@@ -73,8 +82,17 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
def start() {
+ val properties = new ArrayBuffer[(String, String)]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.")) {
+ properties += ((key, value))
+ }
+ }
masterActor = actorSystem.actorOf(
- Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME)
+ Props(new MasterActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
}
def stop() {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index b0b3cbe7d5..f9a1b74fa5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -3,7 +3,11 @@ package spark.scheduler.cluster
import java.nio.ByteBuffer
import spark.util.SerializableBuffer
-class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
+class TaskDescription(
+ val taskId: Long,
+ val slaveId: String,
+ val name: String,
+ _serializedTask: ByteBuffer)
extends Serializable {
// Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index ab07f1c8c2..be24316e80 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -191,7 +191,7 @@ class TaskSetManager(
def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
- var localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
+ val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
findTask(host, localOnly) match {
case Some(index) => {
@@ -218,7 +218,7 @@ class TaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
- return Some(new TaskDescription(taskId, taskName, serializedTask))
+ return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask))
}
case _ =>
}
@@ -227,7 +227,6 @@ class TaskSetManager(
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- logInfo("statusUpdate: " + tid + " is now " + state + " " + serializedData)
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
deleted file mode 100644
index 0a6e1350be..0000000000
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-package spark.scheduler.mesos
-
-/*
-import java.io.{File, FileInputStream, FileOutputStream}
-import java.util.{ArrayList => JArrayList}
-import java.util.{List => JList}
-import java.util.{HashMap => JHashMap}
-import java.util.concurrent._
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Map
-import scala.collection.mutable.PriorityQueue
-import scala.collection.JavaConversions._
-import scala.math.Ordering
-
-import akka.actor._
-import akka.dispatch._
-import akka.pattern.ask
-import akka.remote._
-import akka.util.Duration
-import akka.util.Timeout
-import akka.util.duration._
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MTaskInfo, TaskState => MesosTaskState, _}
-
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster.{TaskSetManager, ClusterScheduler}
-
-
-sealed trait CoarseMesosSchedulerMessage
-case class RegisterSlave(slaveId: String, host: String) extends CoarseMesosSchedulerMessage
-case class StatusUpdate(slaveId: String, status: TaskStatus) extends CoarseMesosSchedulerMessage
-case class LaunchTask(slaveId: String, task: MTaskInfo) extends CoarseMesosSchedulerMessage
-case class ReviveOffers() extends CoarseMesosSchedulerMessage
-
-case class FakeOffer(slaveId: String, host: String, cores: Int)
-
-/**
- * Mesos scheduler that uses coarse-grained tasks and does its own fine-grained scheduling inside
- * them using Akka actors for messaging. Clients should first call start(), then submit task sets
- * through the runTasks method.
- *
- * TODO: This is a pretty big hack for now.
- */
-class CoarseMesosScheduler(
- sc: SparkContext,
- master: String,
- frameworkName: String)
- extends ClusterScheduler(sc, master, frameworkName) {
-
- val actorSystem = sc.env.actorSystem
- val actorName = "CoarseMesosScheduler"
- val coresPerSlave = System.getProperty("spark.coarseMesosScheduler.coresPerSlave", "4").toInt
-
- class MasterActor extends Actor {
- val slaveActor = new HashMap[String, ActorRef]
- val slaveHost = new HashMap[String, String]
- val freeCores = new HashMap[String, Int]
-
- def receive = {
- case RegisterSlave(slaveId, host) =>
- slaveActor(slaveId) = sender
- logInfo("Registered slave: " + sender + " with ID " + slaveId)
- slaveHost(slaveId) = host
- freeCores(slaveId) = coresPerSlave
- makeFakeOffers()
-
- case StatusUpdate(slaveId, status) =>
- fakeStatusUpdate(status)
- if (isFinished(status.getState)) {
- freeCores(slaveId) += 1
- makeFakeOffers(slaveId)
- }
-
- case LaunchTask(slaveId, task) =>
- freeCores(slaveId) -= 1
- slaveActor(slaveId) ! LaunchTask(slaveId, task)
-
- case ReviveOffers() =>
- logInfo("Reviving offers")
- makeFakeOffers()
- }
-
- // Make fake resource offers for all slaves
- def makeFakeOffers() {
- fakeResourceOffers(slaveHost.toSeq.map{case (id, host) => FakeOffer(id, host, freeCores(id))})
- }
-
- // Make fake resource offers for all slaves
- def makeFakeOffers(slaveId: String) {
- fakeResourceOffers(Seq(FakeOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
- }
- }
-
- val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName)
-
- val taskIdsOnSlave = new HashMap[String, HashSet[String]]
-
- /**
- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
- * tasks are balanced across the cluster.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- val tasks = offers.map(o => new JArrayList[MTaskInfo])
- for (i <- 0 until offers.size) {
- val o = offers.get(i)
- val slaveId = o.getSlaveId.getValue
- if (!slaveIdToHost.contains(slaveId)) {
- slaveIdToHost(slaveId) = o.getHostname
- hostsAlive += o.getHostname
- taskIdsOnSlave(slaveId) = new HashSet[String]
- // Launch an infinite task on the node that will talk to the MasterActor to get fake tasks
- val cpuRes = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(1).build())
- .build()
- val task = new WorkerTask(slaveId, o.getHostname)
- val serializedTask = Utils.serialize(task)
- tasks(i).add(MTaskInfo.newBuilder()
- .setTaskId(newTaskId())
- .setSlaveId(o.getSlaveId)
- .setExecutor(executorInfo)
- .setName("worker task")
- .addResources(cpuRes)
- .setData(ByteString.copyFrom(serializedTask))
- .build())
- }
- }
- val filters = Filters.newBuilder().setRefuseSeconds(10).build()
- for (i <- 0 until offers.size) {
- d.launchTasks(offers(i).getId(), tasks(i), filters)
- }
- }
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val tid = status.getTaskId.getValue
- var taskSetToUpdate: Option[TaskSetManager] = None
- var taskFailed = false
- synchronized {
- try {
- taskIdToTaskSetId.get(tid) match {
- case Some(taskSetId) =>
- if (activeTaskSets.contains(taskSetId)) {
- //activeTaskSets(taskSetId).statusUpdate(status)
- taskSetToUpdate = Some(activeTaskSets(taskSetId))
- }
- if (isFinished(status.getState)) {
- taskIdToTaskSetId.remove(tid)
- if (taskSetTaskIds.contains(taskSetId)) {
- taskSetTaskIds(taskSetId) -= tid
- }
- val slaveId = taskIdToSlaveId(tid)
- taskIdToSlaveId -= tid
- taskIdsOnSlave(slaveId) -= tid
- }
- if (status.getState == MesosTaskState.TASK_FAILED) {
- taskFailed = true
- }
- case None =>
- logInfo("Ignoring update from TID " + tid + " because its task set is gone")
- }
- } catch {
- case e: Exception => logError("Exception in statusUpdate", e)
- }
- }
- // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock
- if (taskSetToUpdate != None) {
- taskSetToUpdate.get.statusUpdate(status)
- }
- if (taskFailed) {
- // Revive offers if a task had failed for some reason other than host lost
- reviveOffers()
- }
- }
-
- override def slaveLost(d: SchedulerDriver, s: SlaveID) {
- logInfo("Slave lost: " + s.getValue)
- var failedHost: Option[String] = None
- var lostTids: Option[HashSet[String]] = None
- synchronized {
- val slaveId = s.getValue
- val host = slaveIdToHost(slaveId)
- if (hostsAlive.contains(host)) {
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- failedHost = Some(host)
- lostTids = Some(taskIdsOnSlave(slaveId))
- logInfo("failedHost: " + host)
- logInfo("lostTids: " + lostTids)
- taskIdsOnSlave -= slaveId
- activeTaskSetsQueue.foreach(_.hostLost(host))
- }
- }
- if (failedHost != None) {
- // Report all the tasks on the failed host as lost, without holding a lock on this
- for (tid <- lostTids.get; taskSetId <- taskIdToTaskSetId.get(tid)) {
- // TODO: Maybe call our statusUpdate() instead to clean our internal data structures
- activeTaskSets(taskSetId).statusUpdate(TaskStatus.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(tid).build())
- .setState(MesosTaskState.TASK_LOST)
- .build())
- }
- // Also report the loss to the DAGScheduler
- listener.hostLost(failedHost.get)
- reviveOffers()
- }
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- // Check for speculatable tasks in all our active jobs.
- override def checkSpeculatableTasks() {
- var shouldRevive = false
- synchronized {
- for (ts <- activeTaskSetsQueue) {
- shouldRevive |= ts.checkSpeculatableTasks()
- }
- }
- if (shouldRevive) {
- reviveOffers()
- }
- }
-
-
- val lock2 = new Object
- var firstWait = true
-
- override def waitForRegister() {
- lock2.synchronized {
- if (firstWait) {
- super.waitForRegister()
- Thread.sleep(5000)
- firstWait = false
- }
- }
- }
-
- def fakeStatusUpdate(status: TaskStatus) {
- statusUpdate(driver, status)
- }
-
- def fakeResourceOffers(offers: Seq[FakeOffer]) {
- logDebug("fakeResourceOffers: " + offers)
- val availableCpus = offers.map(_.cores.toDouble).toArray
- var launchedTask = false
- for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
- do {
- launchedTask = false
- for (i <- 0 until offers.size if hostsAlive.contains(offers(i).host)) {
- manager.slaveOffer(offers(i).slaveId, offers(i).host, availableCpus(i)) match {
- case Some(task) =>
- val tid = task.getTaskId.getValue
- val sid = offers(i).slaveId
- taskIdToTaskSetId(tid) = manager.taskSet.id
- taskSetTaskIds(manager.taskSet.id) += tid
- taskIdToSlaveId(tid) = sid
- taskIdsOnSlave(sid) += tid
- slaveIdsWithExecutors += sid
- availableCpus(i) -= getResource(task.getResourcesList(), "cpus")
- launchedTask = true
- masterActor ! LaunchTask(sid, task)
-
- case None => {}
- }
- }
- } while (launchedTask)
- }
- }
-
- override def reviveOffers() {
- masterActor ! ReviveOffers()
- }
-}
-
-class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) {
- generation = 0
-
- def run(id: Long) {
- val env = SparkEnv.get
- val classLoader = Thread.currentThread.getContextClassLoader
- val actor = env.actorSystem.actorOf(
- Props(new WorkerActor(slaveId, host, env, classLoader)),
- name = "WorkerActor")
- // Wait forever so that our Mesos task doesn't end
- while (true) {
- Thread.sleep(10000)
- }
- }
-}
-
-class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader)
- extends Actor with Logging {
-
- val threadPool = new ThreadPoolExecutor(
- 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
- val masterIp: String = System.getProperty("spark.master.host", "localhost")
- val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
- val masterActor = env.actorSystem.actorFor(
- "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler"))
-
- class TaskRunner(desc: MTaskInfo)
- extends Runnable {
- override def run() {
- val tid = desc.getTaskId.getValue
- logInfo("Running task ID " + tid)
- try {
- SparkEnv.set(env)
- Thread.currentThread.setContextClassLoader(classLoader)
- Accumulators.clear
- val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
- env.mapOutputTracker.updateGeneration(task.generation)
- val value = task.run(tid.toInt)
- val accumUpdates = Accumulators.values
- val result = new TaskResult(value, accumUpdates)
- masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
- .setState(MesosTaskState.TASK_FINISHED)
- .setData(ByteString.copyFrom(Utils.serialize(result)))
- .build())
- logInfo("Finished task ID " + tid)
- } catch {
- case ffe: FetchFailedException => {
- val reason = ffe.toTaskEndReason
- masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
- .setState(MesosTaskState.TASK_FAILED)
- .setData(ByteString.copyFrom(Utils.serialize(reason)))
- .build())
- }
- case t: Throwable => {
- val reason = ExceptionFailure(t)
- masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
- .setState(MesosTaskState.TASK_FAILED)
- .setData(ByteString.copyFrom(Utils.serialize(reason)))
- .build())
-
- // TODO: Should we exit the whole executor here? On the one hand, the failed task may
- // have left some weird state around depending on when the exception was thrown, but on
- // the other hand, maybe we could detect that when future tasks fail and exit then.
- logError("Exception in task ID " + tid, t)
- //System.exit(1)
- }
- }
- }
- }
-
- override def preStart {
- logInfo("Registering with master")
- masterActor ! RegisterSlave(slaveId, host)
- }
-
- override def receive = {
- case LaunchTask(slaveId_, task) =>
- threadPool.execute(new TaskRunner(task))
- }
-}
-
-*/ \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
new file mode 100644
index 0000000000..040cd6b335
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -0,0 +1,251 @@
+package spark.scheduler.mesos
+
+import com.google.protobuf.ByteString
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import spark.{SparkException, Utils, Logging, SparkContext}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import java.io.File
+import spark.scheduler.cluster._
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+import spark.TaskState
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
+ *
+ * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
+ * remove this.
+ */
+class CoarseMesosSchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ master: String,
+ frameworkName: String)
+ extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+ with MScheduler
+ with Logging {
+
+ // Environment variables to pass to our executors
+ val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
+ "SPARK_MEM",
+ "SPARK_CLASSPATH",
+ "SPARK_LIBRARY_PATH",
+ "SPARK_JAVA_OPTS"
+ )
+
+ // Memory used by each executor (in megabytes)
+ val EXECUTOR_MEMORY = {
+ if (System.getenv("SPARK_MEM") != null) {
+ Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
+ // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+ } else {
+ 512
+ }
+ }
+
+ // Lock used to wait for scheduler to be registered
+ var isRegistered = false
+ val registeredLock = new Object()
+
+ // Driver for talking to Mesos
+ var driver: SchedulerDriver = null
+
+ // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
+ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+ // Cores we have acquired with each Mesos task ID
+ val coresByTaskId = new HashMap[Int, Int]
+ var totalCoresAcquired = 0
+
+ val slaveIdsWithExecutors = new HashSet[String]
+
+ val sparkHome = sc.getSparkHome() match {
+ case Some(path) =>
+ path
+ case None =>
+ throw new SparkException("Spark home is not set; set it through the spark.home system " +
+ "property, the SPARK_HOME environment variable or the SparkContext constructor")
+ }
+
+ var nextMesosTaskId = 0
+
+ def newMesosTaskId(): Int = {
+ val id = nextMesosTaskId
+ nextMesosTaskId += 1
+ id
+ }
+
+ override def start() {
+ super.start()
+
+ synchronized {
+ new Thread("CoarseMesosSchedulerBackend driver") {
+ setDaemon(true)
+ override def run() {
+ val scheduler = CoarseMesosSchedulerBackend.this
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
+ driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+ try { {
+ val ret = driver.run()
+ logInfo("driver.run() returned with code " + ret)
+ }
+ } catch {
+ case e: Exception => logError("driver.run() failed", e)
+ }
+ }
+ }.start()
+
+ waitForRegister()
+ }
+ }
+
+ def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+ val runScript = new File(sparkHome, "run").getCanonicalPath
+ val masterUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
+ val environment = Environment.newBuilder()
+ for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
+ if (System.getenv(key) != null) {
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(System.getenv(key))
+ .build())
+ }
+ }
+ return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
+ }
+
+ override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ logInfo("Registered as framework ID " + frameworkId.getValue)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ }
+
+ def waitForRegister() {
+ registeredLock.synchronized {
+ while (!isRegistered) {
+ registeredLock.wait()
+ }
+ }
+ }
+
+ override def disconnected(d: SchedulerDriver) {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+ /**
+ * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+ * unless we've already launched more than we wanted to.
+ */
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ synchronized {
+ val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
+
+ for (offer <- offers) {
+ val slaveId = offer.getSlaveId.toString
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus").toInt
+ if (totalCoresAcquired < maxCores && mem >= EXECUTOR_MEMORY && cpus >= 1 &&
+ !slaveIdsWithExecutors.contains(slaveId)) {
+ // Launch an executor on the slave
+ val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ val taskId = newMesosTaskId()
+ val task = MesosTaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+ .setSlaveId(offer.getSlaveId)
+ .setCommand(createCommand(offer, cpusToUse))
+ .setName("Task " + taskId)
+ .addResources(createResource("cpus", cpusToUse))
+ .addResources(createResource("mem", EXECUTOR_MEMORY))
+ .build()
+ d.launchTasks(offer.getId, Collections.singletonList(task), filters)
+ } else {
+ // Filter it out
+ d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
+ }
+ }
+ }
+ }
+
+ /** Helper function to pull out a resource from a Mesos Resources protobuf */
+ def getResource(res: JList[Resource], name: String): Double = {
+ for (r <- res if r.getName == name) {
+ return r.getScalar.getValue
+ }
+ // If we reached here, no resource with the required name was present
+ throw new IllegalArgumentException("No resource called " + name + " in " + res)
+ }
+
+ /** Build a Mesos resource protobuf object */
+ def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+ Resource.newBuilder()
+ .setName(resourceName)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+ .build()
+ }
+
+ /** Check whether a Mesos task state represents a finished task */
+ def isFinished(state: MesosTaskState) = {
+ state == MesosTaskState.TASK_FINISHED ||
+ state == MesosTaskState.TASK_FAILED ||
+ state == MesosTaskState.TASK_KILLED ||
+ state == MesosTaskState.TASK_LOST
+ }
+
+ override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ val taskId = status.getTaskId.getValue.toInt
+ logInfo("Mesos task " + taskId + " is now " + status.getState)
+ synchronized {
+ if (isFinished(status.getState)) {
+ // Remove the cores we have remembered for this task, if it's in the hashmap
+ for (cores <- coresByTaskId.get(taskId)) {
+ totalCoresAcquired -= cores
+ coresByTaskId -= taskId
+ driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+ }
+ }
+ }
+ }
+
+ override def error(d: SchedulerDriver, message: String) {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ }
+
+ override def stop() {
+ super.stop()
+ if (driver != null) {
+ driver.stop()
+ }
+ }
+
+ override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ synchronized {
+ slaveIdsWithExecutors -= slaveId.getValue
+ }
+ }
+
+ override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
+ slaveLost(d, s)
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 110b178582..44eda93dd1 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -15,6 +15,11 @@ import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import spark.TaskState
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
class MesosSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
@@ -60,11 +65,10 @@ class MesosSchedulerBackend(
synchronized {
new Thread("MesosSchedulerBackend driver") {
setDaemon(true)
-
override def run() {
- val sched = MesosSchedulerBackend.this
+ val scheduler = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
- driver = new MesosSchedulerDriver(sched, fwInfo, master)
+ driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
index fa9e6d62cb..0830843a77 100644
--- a/core/src/main/scala/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/spark/util/SerializableBuffer.scala
@@ -8,7 +8,7 @@ import java.nio.channels.Channels
* A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
* it easier to pass ByteBuffers in case class messages.
*/
-class SerializableBuffer(@transient var buffer: ByteBuffer) {
+class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
def value = buffer
private def readObject(in: ObjectInputStream) {