From 909b325243ebedfc1bd47fc3d7f70cde178508fc Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 6 Jul 2012 17:56:44 -0700 Subject: Further refactoring, and start of a standalone scheduler backend --- core/src/main/scala/spark/SparkContext.scala | 6 +- core/src/main/scala/spark/executor/Executor.scala | 4 +- .../scala/spark/executor/ExecutorBackend.scala | 11 + .../scala/spark/executor/ExecutorContext.scala | 11 - .../spark/executor/MesosExecutorBackend.scala | 69 +++++ .../scala/spark/executor/MesosExecutorRunner.scala | 69 ----- .../spark/scheduler/cluster/ClusterScheduler.scala | 24 +- .../cluster/ClusterSchedulerContext.scala | 10 - .../spark/scheduler/cluster/SchedulerBackend.scala | 15 ++ .../spark/scheduler/cluster/TaskDescription.scala | 12 +- .../spark/scheduler/mesos/MesosScheduler.scala | 291 --------------------- .../scheduler/mesos/MesosSchedulerBackend.scala | 291 +++++++++++++++++++++ .../standalone/StandaloneClusterMessage.scala | 16 ++ .../standalone/StandaloneSchedulerBackend.scala | 106 ++++++++ .../scala/spark/util/SerializableByteBuffer.scala | 35 +++ spark-executor | 2 +- 16 files changed, 572 insertions(+), 400 deletions(-) create mode 100644 core/src/main/scala/spark/executor/ExecutorBackend.scala delete mode 100644 core/src/main/scala/spark/executor/ExecutorContext.scala create mode 100644 core/src/main/scala/spark/executor/MesosExecutorBackend.scala delete mode 100644 core/src/main/scala/spark/executor/MesosExecutorRunner.scala delete mode 100644 core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala create mode 100644 core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala delete mode 100644 core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala create mode 100644 core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala create mode 100644 core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala create mode 100644 core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala create mode 100644 core/src/main/scala/spark/util/SerializableByteBuffer.scala diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 2e8cb609b2..8a06642426 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -42,7 +42,7 @@ import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.ClusterScheduler -import spark.scheduler.mesos.MesosScheduler +import spark.scheduler.mesos.MesosSchedulerBackend import spark.storage.BlockManagerMaster class SparkContext( @@ -90,14 +90,14 @@ class SparkContext( case _ => MesosNativeLibrary.load() val sched = new ClusterScheduler(this) - val schedContext = new MesosScheduler(sched, this, master, frameworkName) + val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName) sched.initialize(schedContext) sched /* if (System.getProperty("spark.mesos.coarse", "false") == "true") { new CoarseMesosScheduler(this, master, frameworkName) } else { - new MesosScheduler(this, master, frameworkName) + new MesosSchedulerBackend(this, master, frameworkName) } */ } diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index ac30ae9aec..e3958cec51 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -47,11 +47,11 @@ class Executor extends Logging { 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) } - def launchTask(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) { + def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } - class TaskRunner(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { override def run() { diff --git a/core/src/main/scala/spark/executor/ExecutorBackend.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala new file mode 100644 index 0000000000..24c8776f31 --- /dev/null +++ b/core/src/main/scala/spark/executor/ExecutorBackend.scala @@ -0,0 +1,11 @@ +package spark.executor + +import java.nio.ByteBuffer +import spark.TaskState.TaskState + +/** + * A pluggable interface used by the Executor to send updates to the cluster scheduler. + */ +trait ExecutorBackend { + def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) +} diff --git a/core/src/main/scala/spark/executor/ExecutorContext.scala b/core/src/main/scala/spark/executor/ExecutorContext.scala deleted file mode 100644 index 6b86d8d18a..0000000000 --- a/core/src/main/scala/spark/executor/ExecutorContext.scala +++ /dev/null @@ -1,11 +0,0 @@ -package spark.executor - -import java.nio.ByteBuffer -import spark.TaskState.TaskState - -/** - * Interface used by Executor to send back updates to the cluster scheduler. - */ -trait ExecutorContext { - def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) -} diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala new file mode 100644 index 0000000000..50f4e41ede --- /dev/null +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -0,0 +1,69 @@ +package spark.executor + +import java.nio.ByteBuffer +import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver} +import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => MesosTaskStatus, _} +import spark.TaskState.TaskState +import com.google.protobuf.ByteString +import spark.{Utils, Logging} +import spark.TaskState + +class MesosExecutorBackend(executor: Executor) + extends MesosExecutor + with ExecutorBackend + with Logging { + + var driver: ExecutorDriver = null + + override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { + val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() + driver.sendStatusUpdate(MesosTaskStatus.newBuilder() + .setTaskId(mesosTaskId) + .setState(TaskState.toMesos(state)) + .setData(ByteString.copyFrom(data)) + .build()) + } + + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, + slaveInfo: SlaveInfo) { + this.driver = driver + val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) + executor.initialize(slaveInfo.getHostname, properties) + } + + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { + val taskId = taskInfo.getTaskId.getValue.toLong + executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + } + + override def error(d: ExecutorDriver, message: String) { + logError("Error from Mesos: " + message) + } + + override def killTask(d: ExecutorDriver, t: TaskID) { + logWarning("Mesos asked us to kill task " + t.getValue + "; ignoring (not yet implemented)") + } + + override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} + + override def disconnected(d: ExecutorDriver) {} + + override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} + + override def shutdown(d: ExecutorDriver) {} +} + +/** + * Entry point for Mesos executor. + */ +object MesosExecutorBackend { + def main(args: Array[String]) { + MesosNativeLibrary.load() + // Create a new Executor and start it running + val runner = new MesosExecutorBackend(new Executor) + new MesosExecutorDriver(runner).run() + } +} diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorRunner.scala deleted file mode 100644 index f97d9d0bfa..0000000000 --- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala +++ /dev/null @@ -1,69 +0,0 @@ -package spark.executor - -import java.nio.ByteBuffer -import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver} -import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => MesosTaskStatus, _} -import spark.TaskState.TaskState -import com.google.protobuf.ByteString -import spark.{Utils, Logging} -import spark.TaskState - -class MesosExecutorRunner(executor: Executor) - extends MesosExecutor - with ExecutorContext - with Logging { - - var driver: ExecutorDriver = null - - override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { - val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() - driver.sendStatusUpdate(MesosTaskStatus.newBuilder() - .setTaskId(mesosTaskId) - .setState(TaskState.toMesos(state)) - .setData(ByteString.copyFrom(data)) - .build()) - } - - override def registered( - driver: ExecutorDriver, - executorInfo: ExecutorInfo, - frameworkInfo: FrameworkInfo, - slaveInfo: SlaveInfo) { - this.driver = driver - val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize(slaveInfo.getHostname, properties) - } - - override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { - val taskId = taskInfo.getTaskId.getValue.toLong - executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) - } - - override def error(d: ExecutorDriver, message: String) { - logError("Error from Mesos: " + message) - } - - override def killTask(d: ExecutorDriver, t: TaskID) { - logWarning("Mesos asked us to kill task " + t.getValue + "; ignoring (not yet implemented)") - } - - override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} - - override def disconnected(d: ExecutorDriver) {} - - override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} - - override def shutdown(d: ExecutorDriver) {} -} - -/** - * Entry point for Mesos executor. - */ -object MesosExecutorRunner { - def main(args: Array[String]) { - MesosNativeLibrary.load() - // Create a new Executor and start it running - val runner = new MesosExecutorRunner(new Executor) - new MesosExecutorDriver(runner).run() - } -} diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index c9b0c4e9b6..7f1664b483 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -50,7 +50,7 @@ class ClusterScheduler(sc: SparkContext) // Listener object to pass upcalls into var listener: TaskSchedulerListener = null - var schedContext: ClusterSchedulerContext = null + var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker @@ -58,15 +58,15 @@ class ClusterScheduler(sc: SparkContext) this.listener = listener } - def initialize(context: ClusterSchedulerContext) { - schedContext = context + def initialize(context: SchedulerBackend) { + backend = context createJarServer() } def newTaskId(): Long = nextTaskId.getAndIncrement() override def start() { - schedContext.start() + backend.start() if (System.getProperty("spark.speculation", "false") == "true") { new Thread("ClusterScheduler speculation check") { @@ -95,7 +95,7 @@ class ClusterScheduler(sc: SparkContext) activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet[Long]() } - schedContext.reviveOffers() + backend.reviveOffers() } def taskSetFinished(manager: TaskSetManager) { @@ -197,11 +197,11 @@ class ClusterScheduler(sc: SparkContext) } if (failedHost != None) { listener.hostLost(failedHost.get) - schedContext.reviveOffers() + backend.reviveOffers() } if (taskFailed) { // Also revive offers if a task had failed for some reason other than host lost - schedContext.reviveOffers() + backend.reviveOffers() } } @@ -227,15 +227,15 @@ class ClusterScheduler(sc: SparkContext) } override def stop() { - if (schedContext != null) { - schedContext.stop() + if (backend != null) { + backend.stop() } if (jarServer != null) { jarServer.stop() } } - override def defaultParallelism() = schedContext.defaultParallelism() + override def defaultParallelism() = backend.defaultParallelism() // Create a server for all the JARs added by the user to SparkContext. // We first copy the JARs to a temp directory for easier server setup. @@ -271,7 +271,7 @@ class ClusterScheduler(sc: SparkContext) } } if (shouldRevive) { - schedContext.reviveOffers() + backend.reviveOffers() } } @@ -288,7 +288,7 @@ class ClusterScheduler(sc: SparkContext) } if (failedHost != None) { listener.hostLost(failedHost.get) - schedContext.reviveOffers() + backend.reviveOffers() } } } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala deleted file mode 100644 index 6b9687ac25..0000000000 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala +++ /dev/null @@ -1,10 +0,0 @@ -package spark.scheduler.cluster - -trait ClusterSchedulerContext { - def start(): Unit - def stop(): Unit - def reviveOffers(): Unit - def defaultParallelism(): Int - - // TODO: Probably want to add a killTask too -} diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala new file mode 100644 index 0000000000..897976c3f9 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala @@ -0,0 +1,15 @@ +package spark.scheduler.cluster + +/** + * A backend interface for cluster scheduling systems that allows plugging in different ones under + * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as + * machines become available and can launch tasks on them. + */ +trait SchedulerBackend { + def start(): Unit + def stop(): Unit + def reviveOffers(): Unit + def defaultParallelism(): Int + + // TODO: Probably want to add a killTask too +} diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index fad62f96aa..160977372d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -1,5 +1,15 @@ package spark.scheduler.cluster +import java.nio.channels.Channels import java.nio.ByteBuffer +import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream} +import spark.util.SerializableByteBuffer -class TaskDescription(val taskId: Long, val name: String, val serializedTask: ByteBuffer) {} +class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer) + extends Serializable { + + // Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer + private val buffer = new SerializableByteBuffer(_serializedTask) + + def serializedTask: ByteBuffer = buffer.value +} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala deleted file mode 100644 index f5c35becda..0000000000 --- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala +++ /dev/null @@ -1,291 +0,0 @@ -package spark.scheduler.mesos - -import com.google.protobuf.ByteString - -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} - -import spark.{SparkException, Utils, Logging, SparkContext} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.JavaConversions._ -import java.io.File -import spark.scheduler.cluster._ -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections -import spark.TaskState - -class MesosScheduler( - scheduler: ClusterScheduler, - sc: SparkContext, - master: String, - frameworkName: String) - extends ClusterSchedulerContext - with MScheduler - with Logging { - - // Environment variables to pass to our executors - val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( - "SPARK_MEM", - "SPARK_CLASSPATH", - "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS" - ) - - // Memory used by each executor (in megabytes) - val EXECUTOR_MEMORY = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } - } - - // Lock used to wait for scheduler to be registered - var isRegistered = false - val registeredLock = new Object() - - // Driver for talking to Mesos - var driver: SchedulerDriver = null - - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] - val taskIdToSlaveId = new HashMap[Long, String] - - // An ExecutorInfo for our tasks - var executorInfo: ExecutorInfo = null - - override def start() { - synchronized { - new Thread("MesosScheduler driver") { - setDaemon(true) - - override def run() { - val sched = MesosScheduler.this - val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() - driver = new MesosSchedulerDriver(sched, fwInfo, master) - try { - val ret = driver.run() - logInfo("driver.run() returned with code " + ret) - } catch { - case e: Exception => logError("driver.run() failed", e) - } - } - }.start() - - executorInfo = createExecutorInfo() - waitForRegister() - } - } - - def createExecutorInfo(): ExecutorInfo = { - val sparkHome = sc.getSparkHome match { - case Some(path) => - path - case None => - throw new SparkException("Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor") - } - val execScript = new File(sparkHome, "spark-executor").getCanonicalPath - val environment = Environment.newBuilder() - for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { - if (System.getenv(key) != null) { - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(System.getenv(key)) - .build()) - } - } - val memory = Resource.newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) - .build() - val command = CommandInfo.newBuilder() - .setValue(execScript) - .setEnvironment(environment) - .build() - ExecutorInfo.newBuilder() - .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) - .setCommand(command) - .setData(ByteString.copyFrom(createExecArg())) - .addResources(memory) - .build() - } - - /** - * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array - * containing all the spark.* system properties in the form of (String, String) pairs. - */ - private def createExecArg(): Array[Byte] = { - val props = new HashMap[String, String] - val iterator = System.getProperties.entrySet.iterator - while (iterator.hasNext) { - val entry = iterator.next - val (key, value) = (entry.getKey.toString, entry.getValue.toString) - if (key.startsWith("spark.")) { - props(key) = value - } - } - // Serialize the map as an array of (String, String) pairs - return Utils.serialize(props.toArray) - } - - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} - - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - logInfo("Registered as framework ID " + frameworkId.getValue) - registeredLock.synchronized { - isRegistered = true - registeredLock.notifyAll() - } - } - - def waitForRegister() { - registeredLock.synchronized { - while (!isRegistered) { - registeredLock.wait() - } - } - } - - override def disconnected(d: SchedulerDriver) {} - - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} - - /** - * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. - */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { - // Build a big list of the offerable workers, and remember their indices so that we can - // figure out which Offer to reply to for each worker - val offerableIndices = new ArrayBuffer[Int] - val offerableWorkers = new ArrayBuffer[WorkerOffer] - - def enoughMemory(o: Offer) = { - val mem = getResource(o.getResourcesList, "mem") - val slaveId = o.getSlaveId.getValue - mem >= EXECUTOR_MEMORY || slaveIdsWithExecutors.contains(slaveId) - } - - for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { - offerableIndices += index - offerableWorkers += new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) - } - - // Call into the ClusterScheduler - val taskLists = scheduler.resourceOffers(offerableWorkers) - - // Build a list of Mesos tasks for each slave - val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) - for ((taskList, index) <- taskLists.zipWithIndex) { - if (!taskList.isEmpty) { - val offerNum = offerableIndices(index) - val slaveId = offers(offerNum).getSlaveId.getValue - slaveIdsWithExecutors += slaveId - mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) - for (taskDesc <- taskList) { - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) - } - } - } - - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - for (i <- 0 until offers.size) { - d.launchTasks(offers(i).getId, mesosTasks(i), filters) - } - } - } - - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - // If we reached here, no resource with the required name was present - throw new IllegalArgumentException("No resource called " + name + " in " + res) - } - - /** Turn a Spark TaskDescription into a Mesos task */ - def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { - val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val cpuResource = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(1).build()) - .build() - return MesosTaskInfo.newBuilder() - .setTaskId(taskId) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(executorInfo) - .setName(task.name) - .addResources(cpuResource) - .setData(ByteString.copyFrom(task.serializedTask)) - .build() - } - - /** Check whether a Mesos task state represents a finished task */ - def isFinished(state: MesosTaskState) = { - state == MesosTaskState.TASK_FINISHED || - state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || - state == MesosTaskState.TASK_LOST - } - - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val tid = status.getTaskId.getValue.toLong - val state = TaskState.fromMesos(status.getState) - synchronized { - if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - slaveIdsWithExecutors -= taskIdToSlaveId(tid) - } - if (isFinished(status.getState)) { - taskIdToSlaveId.remove(tid) - } - } - scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) - } - - override def error(d: SchedulerDriver, message: String) { - logError("Mesos error: " + message) - scheduler.error(message) - } - - override def stop() { - if (driver != null) { - driver.stop() - } - } - - override def reviveOffers() { - driver.reviveOffers() - } - - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { - logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - slaveIdsWithExecutors -= slaveId.getValue - } - scheduler.slaveLost(slaveId.toString) - } - - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) - } - - // TODO: query Mesos for number of cores - override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt -} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala new file mode 100644 index 0000000000..4e95666da1 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -0,0 +1,291 @@ +package spark.scheduler.mesos + +import com.google.protobuf.ByteString + +import org.apache.mesos.{Scheduler => MScheduler} +import org.apache.mesos._ +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} + +import spark.{SparkException, Utils, Logging, SparkContext} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import java.io.File +import spark.scheduler.cluster._ +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections +import spark.TaskState + +class MesosSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + master: String, + frameworkName: String) + extends SchedulerBackend + with MScheduler + with Logging { + + // Environment variables to pass to our executors + val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( + "SPARK_MEM", + "SPARK_CLASSPATH", + "SPARK_LIBRARY_PATH", + "SPARK_JAVA_OPTS" + ) + + // Memory used by each executor (in megabytes) + val EXECUTOR_MEMORY = { + if (System.getenv("SPARK_MEM") != null) { + Utils.memoryStringToMb(System.getenv("SPARK_MEM")) + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + } else { + 512 + } + } + + // Lock used to wait for scheduler to be registered + var isRegistered = false + val registeredLock = new Object() + + // Driver for talking to Mesos + var driver: SchedulerDriver = null + + // Which slave IDs we have executors on + val slaveIdsWithExecutors = new HashSet[String] + val taskIdToSlaveId = new HashMap[Long, String] + + // An ExecutorInfo for our tasks + var executorInfo: ExecutorInfo = null + + override def start() { + synchronized { + new Thread("MesosSchedulerBackend driver") { + setDaemon(true) + + override def run() { + val sched = MesosSchedulerBackend.this + val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() + driver = new MesosSchedulerDriver(sched, fwInfo, master) + try { + val ret = driver.run() + logInfo("driver.run() returned with code " + ret) + } catch { + case e: Exception => logError("driver.run() failed", e) + } + } + }.start() + + executorInfo = createExecutorInfo() + waitForRegister() + } + } + + def createExecutorInfo(): ExecutorInfo = { + val sparkHome = sc.getSparkHome match { + case Some(path) => + path + case None => + throw new SparkException("Spark home is not set; set it through the spark.home system " + + "property, the SPARK_HOME environment variable or the SparkContext constructor") + } + val execScript = new File(sparkHome, "spark-executor").getCanonicalPath + val environment = Environment.newBuilder() + for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { + if (System.getenv(key) != null) { + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(System.getenv(key)) + .build()) + } + } + val memory = Resource.newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .build() + val command = CommandInfo.newBuilder() + .setValue(execScript) + .setEnvironment(environment) + .build() + ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) + .setCommand(command) + .setData(ByteString.copyFrom(createExecArg())) + .addResources(memory) + .build() + } + + /** + * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array + * containing all the spark.* system properties in the form of (String, String) pairs. + */ + private def createExecArg(): Array[Byte] = { + val props = new HashMap[String, String] + val iterator = System.getProperties.entrySet.iterator + while (iterator.hasNext) { + val entry = iterator.next + val (key, value) = (entry.getKey.toString, entry.getValue.toString) + if (key.startsWith("spark.")) { + props(key) = value + } + } + // Serialize the map as an array of (String, String) pairs + return Utils.serialize(props.toArray) + } + + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + + override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + logInfo("Registered as framework ID " + frameworkId.getValue) + registeredLock.synchronized { + isRegistered = true + registeredLock.notifyAll() + } + } + + def waitForRegister() { + registeredLock.synchronized { + while (!isRegistered) { + registeredLock.wait() + } + } + } + + override def disconnected(d: SchedulerDriver) {} + + override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + + /** + * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets + * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that + * tasks are balanced across the cluster. + */ + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + synchronized { + // Build a big list of the offerable workers, and remember their indices so that we can + // figure out which Offer to reply to for each worker + val offerableIndices = new ArrayBuffer[Int] + val offerableWorkers = new ArrayBuffer[WorkerOffer] + + def enoughMemory(o: Offer) = { + val mem = getResource(o.getResourcesList, "mem") + val slaveId = o.getSlaveId.getValue + mem >= EXECUTOR_MEMORY || slaveIdsWithExecutors.contains(slaveId) + } + + for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { + offerableIndices += index + offerableWorkers += new WorkerOffer( + offer.getSlaveId.getValue, + offer.getHostname, + getResource(offer.getResourcesList, "cpus").toInt) + } + + // Call into the ClusterScheduler + val taskLists = scheduler.resourceOffers(offerableWorkers) + + // Build a list of Mesos tasks for each slave + val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) + for ((taskList, index) <- taskLists.zipWithIndex) { + if (!taskList.isEmpty) { + val offerNum = offerableIndices(index) + val slaveId = offers(offerNum).getSlaveId.getValue + slaveIdsWithExecutors += slaveId + mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) + for (taskDesc <- taskList) { + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) + } + } + } + + // Reply to the offers + val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? + for (i <- 0 until offers.size) { + d.launchTasks(offers(i).getId, mesosTasks(i), filters) + } + } + } + + /** Helper function to pull out a resource from a Mesos Resources protobuf */ + def getResource(res: JList[Resource], name: String): Double = { + for (r <- res if r.getName == name) { + return r.getScalar.getValue + } + // If we reached here, no resource with the required name was present + throw new IllegalArgumentException("No resource called " + name + " in " + res) + } + + /** Turn a Spark TaskDescription into a Mesos task */ + def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { + val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() + val cpuResource = Resource.newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(1).build()) + .build() + return MesosTaskInfo.newBuilder() + .setTaskId(taskId) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) + .setExecutor(executorInfo) + .setName(task.name) + .addResources(cpuResource) + .setData(ByteString.copyFrom(task.serializedTask)) + .build() + } + + /** Check whether a Mesos task state represents a finished task */ + def isFinished(state: MesosTaskState) = { + state == MesosTaskState.TASK_FINISHED || + state == MesosTaskState.TASK_FAILED || + state == MesosTaskState.TASK_KILLED || + state == MesosTaskState.TASK_LOST + } + + override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + val tid = status.getTaskId.getValue.toLong + val state = TaskState.fromMesos(status.getState) + synchronized { + if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { + // We lost the executor on this slave, so remember that it's gone + slaveIdsWithExecutors -= taskIdToSlaveId(tid) + } + if (isFinished(status.getState)) { + taskIdToSlaveId.remove(tid) + } + } + scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) + } + + override def error(d: SchedulerDriver, message: String) { + logError("Mesos error: " + message) + scheduler.error(message) + } + + override def stop() { + if (driver != null) { + driver.stop() + } + } + + override def reviveOffers() { + driver.reviveOffers() + } + + override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + logInfo("Mesos slave lost: " + slaveId.getValue) + synchronized { + slaveIdsWithExecutors -= slaveId.getValue + } + scheduler.slaveLost(slaveId.toString) + } + + override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) + slaveLost(d, s) + } + + // TODO: query Mesos for number of cores + override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt +} diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala new file mode 100644 index 0000000000..4f922a51e1 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala @@ -0,0 +1,16 @@ +package spark.scheduler.standalone + +import spark.TaskState.TaskState +import spark.scheduler.cluster.TaskDescription + +sealed trait StandaloneClusterMessage extends Serializable + +case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage +case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage + +case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte]) + extends StandaloneClusterMessage + +case object ReviveOffers extends StandaloneClusterMessage +case object StopMaster extends StandaloneClusterMessage + diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala new file mode 100644 index 0000000000..5ace6622aa --- /dev/null +++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala @@ -0,0 +1,106 @@ +package spark.scheduler.standalone + +import scala.collection.mutable.{HashMap, HashSet} + +import akka.actor.{Props, Actor, ActorRef, ActorSystem} +import akka.util.duration._ +import akka.pattern.ask + +import spark.{SparkException, Logging, TaskState} +import spark.TaskState.TaskState +import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend} +import akka.dispatch.Await +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +/** + * A standalone scheduler backend, which waits for standalone executors to connect to it through + * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained + * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). + */ +class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) + extends SchedulerBackend + with Logging { + + // Use an atomic variable to track total number of cores in the cluster for simplicity and speed + var totalCoreCount = new AtomicInteger(0) + + class MasterActor extends Actor { + val slaveActor = new HashMap[String, ActorRef] + val slaveHost = new HashMap[String, String] + val freeCores = new HashMap[String, Int] + + def receive = { + case RegisterSlave(slaveId, host, cores) => + slaveActor(slaveId) = sender + logInfo("Registered slave: " + sender + " with ID " + slaveId) + slaveHost(slaveId) = host + freeCores(slaveId) = cores + totalCoreCount.addAndGet(cores) + makeOffers() + + case StatusUpdate(slaveId, taskId, state, data) => + scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data)) + if (TaskState.isFinished(state)) { + freeCores(slaveId) += 1 + makeOffers(slaveId) + } + + case LaunchTask(slaveId, task) => + freeCores(slaveId) -= 1 + slaveActor(slaveId) ! LaunchTask(slaveId, task) + + case ReviveOffers => + makeOffers() + + case StopMaster => + sender ! true + context.stop(self) + + // TODO: Deal with nodes disconnecting too! (Including decreasing totalCoreCount) + } + + // Make fake resource offers on all slaves + def makeOffers() { + scheduler.resourceOffers( + slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}) + } + + // Make fake resource offers on just one slave + def makeOffers(slaveId: String) { + scheduler.resourceOffers( + Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))) + } + } + + var masterActor: ActorRef = null + val taskIdsOnSlave = new HashMap[String, HashSet[String]] + + def start() { + masterActor = actorSystem.actorOf( + Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME) + } + + def stop() { + try { + if (masterActor != null) { + val timeout = 5.seconds + val future = masterActor.ask(StopMaster)(timeout) + Await.result(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error stopping standalone scheduler master actor", e) + } + } + + def reviveOffers() { + masterActor ! ReviveOffers + } + + def defaultParallelism(): Int = totalCoreCount.get() +} + +object StandaloneSchedulerBackend { + val ACTOR_NAME = "StandaloneScheduler" +} diff --git a/core/src/main/scala/spark/util/SerializableByteBuffer.scala b/core/src/main/scala/spark/util/SerializableByteBuffer.scala new file mode 100644 index 0000000000..f7c8112346 --- /dev/null +++ b/core/src/main/scala/spark/util/SerializableByteBuffer.scala @@ -0,0 +1,35 @@ +package spark.util + +import java.nio.ByteBuffer +import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream} +import java.nio.channels.Channels + +/** + * A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization. + */ +class SerializableByteBuffer(@transient var buffer: ByteBuffer) { + def value = buffer + + private def readObject(in: ObjectInputStream) { + val length = in.readInt() + buffer = ByteBuffer.allocate(length) + var amountRead = 0 + val channel = Channels.newChannel(in) + while (amountRead < length) { + val ret = channel.read(buffer) + if (ret == -1) { + throw new EOFException("End of file before fully reading buffer") + } + amountRead += ret + } + buffer.rewind() // Allow us to read it later + } + + private def writeObject(out: ObjectOutputStream) { + out.writeInt(buffer.limit()) + if (Channels.newChannel(out).write(buffer) != buffer.limit()) { + throw new IOException("Could not fully write buffer to output stream") + } + buffer.rewind() // Allow us to write it again later + } +} diff --git a/spark-executor b/spark-executor index 2d6934f7da..b66c374ca8 100755 --- a/spark-executor +++ b/spark-executor @@ -1,4 +1,4 @@ #!/bin/sh FWDIR="`dirname $0`" echo "Running spark-executor with framework dir = $FWDIR" -exec $FWDIR/run spark.executor.MesosExecutorRunner +exec $FWDIR/run spark.executor.MesosExecutorBackend -- cgit v1.2.3