aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-06 17:56:44 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-06 17:56:44 -0700
commit909b325243ebedfc1bd47fc3d7f70cde178508fc (patch)
tree5a5fc3068fa30a9d46f89fd33f772530f2595874
parent4e2fe0bdaf7c2626d8b8461fed36259c9830a25c (diff)
downloadspark-909b325243ebedfc1bd47fc3d7f70cde178508fc.tar.gz
spark-909b325243ebedfc1bd47fc3d7f70cde178508fc.tar.bz2
spark-909b325243ebedfc1bd47fc3d7f70cde178508fc.zip
Further refactoring, and start of a standalone scheduler backend
-rw-r--r--core/src/main/scala/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/spark/executor/ExecutorBackend.scala (renamed from core/src/main/scala/spark/executor/ExecutorContext.scala)4
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala (renamed from core/src/main/scala/spark/executor/MesosExecutorRunner.scala)8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala (renamed from core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala)8
-rw-r--r--core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala106
-rw-r--r--core/src/main/scala/spark/util/SerializableByteBuffer.scala35
-rwxr-xr-xspark-executor2
13 files changed, 211 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 2e8cb609b2..8a06642426 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -42,7 +42,7 @@ import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.ClusterScheduler
-import spark.scheduler.mesos.MesosScheduler
+import spark.scheduler.mesos.MesosSchedulerBackend
import spark.storage.BlockManagerMaster
class SparkContext(
@@ -90,14 +90,14 @@ class SparkContext(
case _ =>
MesosNativeLibrary.load()
val sched = new ClusterScheduler(this)
- val schedContext = new MesosScheduler(sched, this, master, frameworkName)
+ val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName)
sched.initialize(schedContext)
sched
/*
if (System.getProperty("spark.mesos.coarse", "false") == "true") {
new CoarseMesosScheduler(this, master, frameworkName)
} else {
- new MesosScheduler(this, master, frameworkName)
+ new MesosSchedulerBackend(this, master, frameworkName)
}
*/
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index ac30ae9aec..e3958cec51 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -47,11 +47,11 @@ class Executor extends Logging {
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
- def launchTask(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) {
+ def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
- class TaskRunner(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer)
+ class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {
override def run() {
diff --git a/core/src/main/scala/spark/executor/ExecutorContext.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala
index 6b86d8d18a..24c8776f31 100644
--- a/core/src/main/scala/spark/executor/ExecutorContext.scala
+++ b/core/src/main/scala/spark/executor/ExecutorBackend.scala
@@ -4,8 +4,8 @@ import java.nio.ByteBuffer
import spark.TaskState.TaskState
/**
- * Interface used by Executor to send back updates to the cluster scheduler.
+ * A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/
-trait ExecutorContext {
+trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
}
diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index f97d9d0bfa..50f4e41ede 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,9 +8,9 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
-class MesosExecutorRunner(executor: Executor)
+class MesosExecutorBackend(executor: Executor)
extends MesosExecutor
- with ExecutorContext
+ with ExecutorBackend
with Logging {
var driver: ExecutorDriver = null
@@ -59,11 +59,11 @@ class MesosExecutorRunner(executor: Executor)
/**
* Entry point for Mesos executor.
*/
-object MesosExecutorRunner {
+object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
- val runner = new MesosExecutorRunner(new Executor)
+ val runner = new MesosExecutorBackend(new Executor)
new MesosExecutorDriver(runner).run()
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index c9b0c4e9b6..7f1664b483 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -50,7 +50,7 @@ class ClusterScheduler(sc: SparkContext)
// Listener object to pass upcalls into
var listener: TaskSchedulerListener = null
- var schedContext: ClusterSchedulerContext = null
+ var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker
@@ -58,15 +58,15 @@ class ClusterScheduler(sc: SparkContext)
this.listener = listener
}
- def initialize(context: ClusterSchedulerContext) {
- schedContext = context
+ def initialize(context: SchedulerBackend) {
+ backend = context
createJarServer()
}
def newTaskId(): Long = nextTaskId.getAndIncrement()
override def start() {
- schedContext.start()
+ backend.start()
if (System.getProperty("spark.speculation", "false") == "true") {
new Thread("ClusterScheduler speculation check") {
@@ -95,7 +95,7 @@ class ClusterScheduler(sc: SparkContext)
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
}
- schedContext.reviveOffers()
+ backend.reviveOffers()
}
def taskSetFinished(manager: TaskSetManager) {
@@ -197,11 +197,11 @@ class ClusterScheduler(sc: SparkContext)
}
if (failedHost != None) {
listener.hostLost(failedHost.get)
- schedContext.reviveOffers()
+ backend.reviveOffers()
}
if (taskFailed) {
// Also revive offers if a task had failed for some reason other than host lost
- schedContext.reviveOffers()
+ backend.reviveOffers()
}
}
@@ -227,15 +227,15 @@ class ClusterScheduler(sc: SparkContext)
}
override def stop() {
- if (schedContext != null) {
- schedContext.stop()
+ if (backend != null) {
+ backend.stop()
}
if (jarServer != null) {
jarServer.stop()
}
}
- override def defaultParallelism() = schedContext.defaultParallelism()
+ override def defaultParallelism() = backend.defaultParallelism()
// Create a server for all the JARs added by the user to SparkContext.
// We first copy the JARs to a temp directory for easier server setup.
@@ -271,7 +271,7 @@ class ClusterScheduler(sc: SparkContext)
}
}
if (shouldRevive) {
- schedContext.reviveOffers()
+ backend.reviveOffers()
}
}
@@ -288,7 +288,7 @@ class ClusterScheduler(sc: SparkContext)
}
if (failedHost != None) {
listener.hostLost(failedHost.get)
- schedContext.reviveOffers()
+ backend.reviveOffers()
}
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala
deleted file mode 100644
index 6b9687ac25..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package spark.scheduler.cluster
-
-trait ClusterSchedulerContext {
- def start(): Unit
- def stop(): Unit
- def reviveOffers(): Unit
- def defaultParallelism(): Int
-
- // TODO: Probably want to add a killTask too
-}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
new file mode 100644
index 0000000000..897976c3f9
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
@@ -0,0 +1,15 @@
+package spark.scheduler.cluster
+
+/**
+ * A backend interface for cluster scheduling systems that allows plugging in different ones under
+ * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
+ * machines become available and can launch tasks on them.
+ */
+trait SchedulerBackend {
+ def start(): Unit
+ def stop(): Unit
+ def reviveOffers(): Unit
+ def defaultParallelism(): Int
+
+ // TODO: Probably want to add a killTask too
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index fad62f96aa..160977372d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -1,5 +1,15 @@
package spark.scheduler.cluster
+import java.nio.channels.Channels
import java.nio.ByteBuffer
+import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream}
+import spark.util.SerializableByteBuffer
-class TaskDescription(val taskId: Long, val name: String, val serializedTask: ByteBuffer) {}
+class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
+ extends Serializable {
+
+ // Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer
+ private val buffer = new SerializableByteBuffer(_serializedTask)
+
+ def serializedTask: ByteBuffer = buffer.value
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index f5c35becda..4e95666da1 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -15,12 +15,12 @@ import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import spark.TaskState
-class MesosScheduler(
+class MesosSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
frameworkName: String)
- extends ClusterSchedulerContext
+ extends SchedulerBackend
with MScheduler
with Logging {
@@ -58,11 +58,11 @@ class MesosScheduler(
override def start() {
synchronized {
- new Thread("MesosScheduler driver") {
+ new Thread("MesosSchedulerBackend driver") {
setDaemon(true)
override def run() {
- val sched = MesosScheduler.this
+ val sched = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
driver = new MesosSchedulerDriver(sched, fwInfo, master)
try {
diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
new file mode 100644
index 0000000000..4f922a51e1
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
@@ -0,0 +1,16 @@
+package spark.scheduler.standalone
+
+import spark.TaskState.TaskState
+import spark.scheduler.cluster.TaskDescription
+
+sealed trait StandaloneClusterMessage extends Serializable
+
+case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
+
+case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte])
+ extends StandaloneClusterMessage
+
+case object ReviveOffers extends StandaloneClusterMessage
+case object StopMaster extends StandaloneClusterMessage
+
diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala
new file mode 100644
index 0000000000..5ace6622aa
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala
@@ -0,0 +1,106 @@
+package spark.scheduler.standalone
+
+import scala.collection.mutable.{HashMap, HashSet}
+
+import akka.actor.{Props, Actor, ActorRef, ActorSystem}
+import akka.util.duration._
+import akka.pattern.ask
+
+import spark.{SparkException, Logging, TaskState}
+import spark.TaskState.TaskState
+import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend}
+import akka.dispatch.Await
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A standalone scheduler backend, which waits for standalone executors to connect to it through
+ * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
+ * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
+ */
+class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+ extends SchedulerBackend
+ with Logging {
+
+ // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
+ var totalCoreCount = new AtomicInteger(0)
+
+ class MasterActor extends Actor {
+ val slaveActor = new HashMap[String, ActorRef]
+ val slaveHost = new HashMap[String, String]
+ val freeCores = new HashMap[String, Int]
+
+ def receive = {
+ case RegisterSlave(slaveId, host, cores) =>
+ slaveActor(slaveId) = sender
+ logInfo("Registered slave: " + sender + " with ID " + slaveId)
+ slaveHost(slaveId) = host
+ freeCores(slaveId) = cores
+ totalCoreCount.addAndGet(cores)
+ makeOffers()
+
+ case StatusUpdate(slaveId, taskId, state, data) =>
+ scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data))
+ if (TaskState.isFinished(state)) {
+ freeCores(slaveId) += 1
+ makeOffers(slaveId)
+ }
+
+ case LaunchTask(slaveId, task) =>
+ freeCores(slaveId) -= 1
+ slaveActor(slaveId) ! LaunchTask(slaveId, task)
+
+ case ReviveOffers =>
+ makeOffers()
+
+ case StopMaster =>
+ sender ! true
+ context.stop(self)
+
+ // TODO: Deal with nodes disconnecting too! (Including decreasing totalCoreCount)
+ }
+
+ // Make fake resource offers on all slaves
+ def makeOffers() {
+ scheduler.resourceOffers(
+ slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})
+ }
+
+ // Make fake resource offers on just one slave
+ def makeOffers(slaveId: String) {
+ scheduler.resourceOffers(
+ Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
+ }
+ }
+
+ var masterActor: ActorRef = null
+ val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+ def start() {
+ masterActor = actorSystem.actorOf(
+ Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME)
+ }
+
+ def stop() {
+ try {
+ if (masterActor != null) {
+ val timeout = 5.seconds
+ val future = masterActor.ask(StopMaster)(timeout)
+ Await.result(future, timeout)
+ }
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error stopping standalone scheduler master actor", e)
+ }
+ }
+
+ def reviveOffers() {
+ masterActor ! ReviveOffers
+ }
+
+ def defaultParallelism(): Int = totalCoreCount.get()
+}
+
+object StandaloneSchedulerBackend {
+ val ACTOR_NAME = "StandaloneScheduler"
+}
diff --git a/core/src/main/scala/spark/util/SerializableByteBuffer.scala b/core/src/main/scala/spark/util/SerializableByteBuffer.scala
new file mode 100644
index 0000000000..f7c8112346
--- /dev/null
+++ b/core/src/main/scala/spark/util/SerializableByteBuffer.scala
@@ -0,0 +1,35 @@
+package spark.util
+
+import java.nio.ByteBuffer
+import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream}
+import java.nio.channels.Channels
+
+/**
+ * A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization.
+ */
+class SerializableByteBuffer(@transient var buffer: ByteBuffer) {
+ def value = buffer
+
+ private def readObject(in: ObjectInputStream) {
+ val length = in.readInt()
+ buffer = ByteBuffer.allocate(length)
+ var amountRead = 0
+ val channel = Channels.newChannel(in)
+ while (amountRead < length) {
+ val ret = channel.read(buffer)
+ if (ret == -1) {
+ throw new EOFException("End of file before fully reading buffer")
+ }
+ amountRead += ret
+ }
+ buffer.rewind() // Allow us to read it later
+ }
+
+ private def writeObject(out: ObjectOutputStream) {
+ out.writeInt(buffer.limit())
+ if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
+ throw new IOException("Could not fully write buffer to output stream")
+ }
+ buffer.rewind() // Allow us to write it again later
+ }
+}
diff --git a/spark-executor b/spark-executor
index 2d6934f7da..b66c374ca8 100755
--- a/spark-executor
+++ b/spark-executor
@@ -1,4 +1,4 @@
#!/bin/sh
FWDIR="`dirname $0`"
echo "Running spark-executor with framework dir = $FWDIR"
-exec $FWDIR/run spark.executor.MesosExecutorRunner
+exec $FWDIR/run spark.executor.MesosExecutorBackend