aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-06 20:17:44 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-06 20:17:44 -0700
commitc5cc10cda3c0e86773c67684eaa4876ab515fdb7 (patch)
tree4ae7f2584eec0af448be1480bfb4b690f99f08cc
parent909b325243ebedfc1bd47fc3d7f70cde178508fc (diff)
downloadspark-c5cc10cda3c0e86773c67684eaa4876ab515fdb7.tar.gz
spark-c5cc10cda3c0e86773c67684eaa4876ab515fdb7.tar.bz2
spark-c5cc10cda3c0e86773c67684eaa4876ab515fdb7.zip
More work on standalone scheduler
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala81
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala29
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala (renamed from core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala)12
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala16
-rw-r--r--core/src/main/scala/spark/util/SerializableBuffer.scala (renamed from core/src/main/scala/spark/util/SerializableByteBuffer.scala)5
8 files changed, 122 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
new file mode 100644
index 0000000000..b717ed2b77
--- /dev/null
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -0,0 +1,81 @@
+package spark.executor
+
+import java.nio.ByteBuffer
+import spark.Logging
+import spark.TaskState.TaskState
+import spark.util.AkkaUtils
+import akka.actor.{ActorRef, Actor, Props}
+import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
+import akka.remote.RemoteClientLifeCycleEvent
+import spark.scheduler.standalone._
+import spark.scheduler.standalone.RegisteredSlave
+import spark.scheduler.standalone.LaunchTask
+import spark.scheduler.standalone.RegisterSlaveFailed
+import spark.scheduler.standalone.RegisterSlave
+
+
+class StandaloneExecutorBackend(
+ executor: Executor,
+ masterUrl: String,
+ slaveId: String,
+ hostname: String,
+ cores: Int)
+ extends Actor
+ with ExecutorBackend
+ with Logging {
+
+ val threadPool = new ThreadPoolExecutor(
+ 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
+
+ var master: ActorRef = null
+
+ override def preStart() {
+ try {
+ master = context.actorFor(masterUrl)
+ master ! RegisterSlave(slaveId, hostname, cores)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to master", e)
+ System.exit(1)
+ }
+ }
+
+ override def receive = {
+ case RegisteredSlave(sparkProperties) =>
+ logInfo("Successfully registered with master")
+ executor.initialize(hostname, sparkProperties)
+
+ case RegisterSlaveFailed(message) =>
+ logError("Slave registration failed: " + message)
+ System.exit(1)
+
+ case LaunchTask(slaveId_, taskDesc) =>
+ executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ }
+
+ override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+ master ! StatusUpdate(slaveId, taskId, state, data)
+ }
+}
+
+object StandaloneExecutorBackend {
+ def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) {
+ // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
+ // before getting started with all our system properties, etc
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ val actor = actorSystem.actorOf(
+ Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)),
+ name = "Executor")
+ actorSystem.awaitTermination()
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>")
+ System.exit(1)
+ }
+ run(args(0), args(1), args(2), args(3).toInt)
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
new file mode 100644
index 0000000000..7f19fe0cc5
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -0,0 +1,29 @@
+package spark.scheduler.cluster
+
+import spark.TaskState.TaskState
+import java.nio.ByteBuffer
+import spark.util.SerializableBuffer
+
+sealed trait StandaloneClusterMessage extends Serializable
+
+// Master to slaves
+case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
+case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
+
+// Slaves to master
+case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+
+case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
+ extends StandaloneClusterMessage
+
+object StatusUpdate {
+ /** Alternate factory method that takes a ByteBuffer directly for the data field */
+ def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
+ StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
+ }
+}
+
+// Internal messages in master
+case object ReviveOffers extends StandaloneClusterMessage
+case object StopMaster extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 5ace6622aa..1acf9e86de 100644
--- a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -1,4 +1,4 @@
-package spark.scheduler.standalone
+package spark.scheduler.cluster
import scala.collection.mutable.{HashMap, HashSet}
@@ -7,10 +7,7 @@ import akka.util.duration._
import akka.pattern.ask
import spark.{SparkException, Logging, TaskState}
-import spark.TaskState.TaskState
-import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend}
import akka.dispatch.Await
-import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
/**
@@ -19,8 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
- extends SchedulerBackend
- with Logging {
+ extends SchedulerBackend with Logging {
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
@@ -40,7 +36,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
makeOffers()
case StatusUpdate(slaveId, taskId, state, data) =>
- scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data))
+ scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
freeCores(slaveId) += 1
makeOffers(slaveId)
@@ -90,7 +86,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
} catch {
case e: Exception =>
- throw new SparkException("Error stopping standalone scheduler master actor", e)
+ throw new SparkException("Error stopping standalone scheduler's master actor", e)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index 160977372d..b0b3cbe7d5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -1,15 +1,13 @@
package spark.scheduler.cluster
-import java.nio.channels.Channels
import java.nio.ByteBuffer
-import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream}
-import spark.util.SerializableByteBuffer
+import spark.util.SerializableBuffer
class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
extends Serializable {
- // Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer
- private val buffer = new SerializableByteBuffer(_serializedTask)
+ // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
+ private val buffer = new SerializableBuffer(_serializedTask)
def serializedTask: ByteBuffer = buffer.value
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
index 51fb3dc72f..0a6e1350be 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
@@ -1,5 +1,6 @@
package spark.scheduler.mesos
+/*
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.{ArrayList => JArrayList}
import java.util.{List => JList}
@@ -32,7 +33,6 @@ import spark._
import spark.scheduler._
import spark.scheduler.cluster.{TaskSetManager, ClusterScheduler}
-/*
sealed trait CoarseMesosSchedulerMessage
case class RegisterSlave(slaveId: String, host: String) extends CoarseMesosSchedulerMessage
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 4e95666da1..110b178582 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -80,7 +80,7 @@ class MesosSchedulerBackend(
}
def createExecutorInfo(): ExecutorInfo = {
- val sparkHome = sc.getSparkHome match {
+ val sparkHome = sc.getSparkHome() match {
case Some(path) =>
path
case None =>
diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
deleted file mode 100644
index 4f922a51e1..0000000000
--- a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package spark.scheduler.standalone
-
-import spark.TaskState.TaskState
-import spark.scheduler.cluster.TaskDescription
-
-sealed trait StandaloneClusterMessage extends Serializable
-
-case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
-case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
-
-case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte])
- extends StandaloneClusterMessage
-
-case object ReviveOffers extends StandaloneClusterMessage
-case object StopMaster extends StandaloneClusterMessage
-
diff --git a/core/src/main/scala/spark/util/SerializableByteBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
index f7c8112346..fa9e6d62cb 100644
--- a/core/src/main/scala/spark/util/SerializableByteBuffer.scala
+++ b/core/src/main/scala/spark/util/SerializableBuffer.scala
@@ -5,9 +5,10 @@ import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream
import java.nio.channels.Channels
/**
- * A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization.
+ * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
+ * it easier to pass ByteBuffers in case class messages.
*/
-class SerializableByteBuffer(@transient var buffer: ByteBuffer) {
+class SerializableBuffer(@transient var buffer: ByteBuffer) {
def value = buffer
private def readObject(in: ObjectInputStream) {