diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-06 20:17:44 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-06 20:17:44 -0700 |
commit | c5cc10cda3c0e86773c67684eaa4876ab515fdb7 (patch) | |
tree | 4ae7f2584eec0af448be1480bfb4b690f99f08cc /core | |
parent | 909b325243ebedfc1bd47fc3d7f70cde178508fc (diff) | |
download | spark-c5cc10cda3c0e86773c67684eaa4876ab515fdb7.tar.gz spark-c5cc10cda3c0e86773c67684eaa4876ab515fdb7.tar.bz2 spark-c5cc10cda3c0e86773c67684eaa4876ab515fdb7.zip |
More work on standalone scheduler
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala | 81 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala | 29 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala (renamed from core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala) | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/SerializableBuffer.scala (renamed from core/src/main/scala/spark/util/SerializableByteBuffer.scala) | 5 |
8 files changed, 122 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala new file mode 100644 index 0000000000..b717ed2b77 --- /dev/null +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -0,0 +1,81 @@ +package spark.executor + +import java.nio.ByteBuffer +import spark.Logging +import spark.TaskState.TaskState +import spark.util.AkkaUtils +import akka.actor.{ActorRef, Actor, Props} +import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} +import akka.remote.RemoteClientLifeCycleEvent +import spark.scheduler.standalone._ +import spark.scheduler.standalone.RegisteredSlave +import spark.scheduler.standalone.LaunchTask +import spark.scheduler.standalone.RegisterSlaveFailed +import spark.scheduler.standalone.RegisterSlave + + +class StandaloneExecutorBackend( + executor: Executor, + masterUrl: String, + slaveId: String, + hostname: String, + cores: Int) + extends Actor + with ExecutorBackend + with Logging { + + val threadPool = new ThreadPoolExecutor( + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) + + var master: ActorRef = null + + override def preStart() { + try { + master = context.actorFor(masterUrl) + master ! RegisterSlave(slaveId, hostname, cores) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } catch { + case e: Exception => + logError("Failed to connect to master", e) + System.exit(1) + } + } + + override def receive = { + case RegisteredSlave(sparkProperties) => + logInfo("Successfully registered with master") + executor.initialize(hostname, sparkProperties) + + case RegisterSlaveFailed(message) => + logError("Slave registration failed: " + message) + System.exit(1) + + case LaunchTask(slaveId_, taskDesc) => + executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + } + + override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { + master ! StatusUpdate(slaveId, taskId, state, data) + } +} + +object StandaloneExecutorBackend { + def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) { + // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor + // before getting started with all our system properties, etc + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + val actor = actorSystem.actorOf( + Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)), + name = "Executor") + actorSystem.awaitTermination() + } + + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>") + System.exit(1) + } + run(args(0), args(1), args(2), args(3).toInt) + } +} diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala new file mode 100644 index 0000000000..7f19fe0cc5 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -0,0 +1,29 @@ +package spark.scheduler.cluster + +import spark.TaskState.TaskState +import java.nio.ByteBuffer +import spark.util.SerializableBuffer + +sealed trait StandaloneClusterMessage extends Serializable + +// Master to slaves +case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage +case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage +case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage + +// Slaves to master +case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage + +case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer) + extends StandaloneClusterMessage + +object StatusUpdate { + /** Alternate factory method that takes a ByteBuffer directly for the data field */ + def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { + StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data)) + } +} + +// Internal messages in master +case object ReviveOffers extends StandaloneClusterMessage +case object StopMaster extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 5ace6622aa..1acf9e86de 100644 --- a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -1,4 +1,4 @@ -package spark.scheduler.standalone +package spark.scheduler.cluster import scala.collection.mutable.{HashMap, HashSet} @@ -7,10 +7,7 @@ import akka.util.duration._ import akka.pattern.ask import spark.{SparkException, Logging, TaskState} -import spark.TaskState.TaskState -import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend} import akka.dispatch.Await -import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger /** @@ -19,8 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). */ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) - extends SchedulerBackend - with Logging { + extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) @@ -40,7 +36,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor makeOffers() case StatusUpdate(slaveId, taskId, state, data) => - scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data)) + scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { freeCores(slaveId) += 1 makeOffers(slaveId) @@ -90,7 +86,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } catch { case e: Exception => - throw new SparkException("Error stopping standalone scheduler master actor", e) + throw new SparkException("Error stopping standalone scheduler's master actor", e) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index 160977372d..b0b3cbe7d5 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -1,15 +1,13 @@ package spark.scheduler.cluster -import java.nio.channels.Channels import java.nio.ByteBuffer -import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream} -import spark.util.SerializableByteBuffer +import spark.util.SerializableBuffer class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer) extends Serializable { - // Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer - private val buffer = new SerializableByteBuffer(_serializedTask) + // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer + private val buffer = new SerializableBuffer(_serializedTask) def serializedTask: ByteBuffer = buffer.value } diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala index 51fb3dc72f..0a6e1350be 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala @@ -1,5 +1,6 @@ package spark.scheduler.mesos +/* import java.io.{File, FileInputStream, FileOutputStream} import java.util.{ArrayList => JArrayList} import java.util.{List => JList} @@ -32,7 +33,6 @@ import spark._ import spark.scheduler._ import spark.scheduler.cluster.{TaskSetManager, ClusterScheduler} -/* sealed trait CoarseMesosSchedulerMessage case class RegisterSlave(slaveId: String, host: String) extends CoarseMesosSchedulerMessage diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 4e95666da1..110b178582 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -80,7 +80,7 @@ class MesosSchedulerBackend( } def createExecutorInfo(): ExecutorInfo = { - val sparkHome = sc.getSparkHome match { + val sparkHome = sc.getSparkHome() match { case Some(path) => path case None => diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala deleted file mode 100644 index 4f922a51e1..0000000000 --- a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala +++ /dev/null @@ -1,16 +0,0 @@ -package spark.scheduler.standalone - -import spark.TaskState.TaskState -import spark.scheduler.cluster.TaskDescription - -sealed trait StandaloneClusterMessage extends Serializable - -case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage -case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage - -case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte]) - extends StandaloneClusterMessage - -case object ReviveOffers extends StandaloneClusterMessage -case object StopMaster extends StandaloneClusterMessage - diff --git a/core/src/main/scala/spark/util/SerializableByteBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala index f7c8112346..fa9e6d62cb 100644 --- a/core/src/main/scala/spark/util/SerializableByteBuffer.scala +++ b/core/src/main/scala/spark/util/SerializableBuffer.scala @@ -5,9 +5,10 @@ import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream import java.nio.channels.Channels /** - * A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization. + * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make + * it easier to pass ByteBuffers in case class messages. */ -class SerializableByteBuffer(@transient var buffer: ByteBuffer) { +class SerializableBuffer(@transient var buffer: ByteBuffer) { def value = buffer private def readObject(in: ObjectInputStream) { |