path: root/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/local/LocalScheduler.scala')
1 files changed, 186 insertions, 66 deletions
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 482d1cc853..f274b1a767 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -1,21 +1,74 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package spark.scheduler.local
import java.io.File
-import java.net.URLClassLoader
-import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
+import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import spark._
-import executor.ExecutorURLClassLoader
+import spark.TaskState.TaskState
+import spark.executor.ExecutorURLClassLoader
import spark.scheduler._
+import spark.scheduler.cluster._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import akka.actor._
- * A simple TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
+ * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
* the scheduler also allows each task to fail up to maxFailures times, which is useful for
* testing fault recovery.
-private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext)
+case class LocalReviveOffers()
+case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
+ def receive = {
+ case LocalReviveOffers =>
+ launchTask(localScheduler.resourceOffer(freeCores))
+ case LocalStatusUpdate(taskId, state, serializeData) =>
+ freeCores += 1
+ localScheduler.statusUpdate(taskId, state, serializeData)
+ launchTask(localScheduler.resourceOffer(freeCores))
+ }
+ def launchTask(tasks : Seq[TaskDescription]) {
+ for (task <- tasks) {
+ freeCores -= 1
+ localScheduler.threadPool.submit(new Runnable {
+ def run() {
+ localScheduler.runTask(task.taskId, task.serializedTask)
+ }
+ })
+ }
+ }
+private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext)
extends TaskScheduler
with Logging {
@@ -31,79 +84,136 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
- // TODO: Need to take into account stage priority in scheduling
+ var schedulableBuilder: SchedulableBuilder = null
+ var rootPool: Pool = null
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+ val activeTaskSets = new HashMap[String, TaskSetManager]
+ val taskIdToTaskSetId = new HashMap[Long, String]
+ val taskSetTaskIds = new HashMap[String, HashSet[Long]]
- override def start() { }
+ var localActor: ActorRef = null
+ override def start() {
+ // temporarily set rootPool name to empty
+ rootPool = new Pool("", schedulingMode, 0, 0)
+ schedulableBuilder = {
+ schedulingMode match {
+ case SchedulingMode.FIFO =>
+ new FIFOSchedulableBuilder(rootPool)
+ case SchedulingMode.FAIR =>
+ new FairSchedulableBuilder(rootPool)
+ }
+ }
+ schedulableBuilder.buildPools()
+ localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
+ }
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
override def submitTasks(taskSet: TaskSet) {
- val tasks = taskSet.tasks
- val failCount = new Array[Int](tasks.size)
- def submitTask(task: Task[_], idInJob: Int) {
- val myAttemptId = attemptId.getAndIncrement()
- threadPool.submit(new Runnable {
- def run() {
- runTask(task, idInJob, myAttemptId)
- }
- })
+ synchronized {
+ val manager = new LocalTaskSetManager(this, taskSet)
+ schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
+ activeTaskSets(taskSet.id) = manager
+ taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+ localActor ! LocalReviveOffers
+ }
- def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
- logInfo("Running " + task)
- // Set the Spark execution environment for the worker thread
- SparkEnv.set(env)
- try {
- Accumulators.clear()
- Thread.currentThread().setContextClassLoader(classLoader)
- // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
- // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val bytes = Task.serializeWithDependencies(task, sc.addedFiles, sc.addedJars, ser)
- logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes")
- val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
- updateDependencies(taskFiles, taskJars) // Download any files added with addFile
- val deserializedTask = ser.deserialize[Task[_]](
- taskBytes, Thread.currentThread.getContextClassLoader)
- // Run it
- val result: Any = deserializedTask.run(attemptId)
- // Serialize and deserialize the result to emulate what the Mesos
- // executor does. This is useful to catch serialization errors early
- // on in development (so when users move their local Spark programs
- // to the cluster, they don't get surprised by serialization errors).
- val resultToReturn = ser.deserialize[Any](ser.serialize(result))
- val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
- ser.serialize(Accumulators.values))
- logInfo("Finished " + task)
- // If the threadpool has not already been shutdown, notify DAGScheduler
- if (!Thread.currentThread().isInterrupted)
- listener.taskEnded(task, Success, resultToReturn, accumUpdates)
- } catch {
- case t: Throwable => {
- logError("Exception in task " + idInJob, t)
- failCount.synchronized {
- failCount(idInJob) += 1
- if (failCount(idInJob) <= maxFailures) {
- submitTask(task, idInJob)
- } else {
- // TODO: Do something nicer here to return all the way to the user
- if (!Thread.currentThread().isInterrupted)
- listener.taskEnded(task, new ExceptionFailure(t), null, null)
- }
+ def resourceOffer(freeCores: Int): Seq[TaskDescription] = {
+ synchronized {
+ var freeCpuCores = freeCores
+ val tasks = new ArrayBuffer[TaskDescription](freeCores)
+ val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
+ for (manager <- sortedTaskSetQueue) {
+ logDebug("parentName:%s,name:%s,runningTasks:%s".format(
+ manager.parent.name, manager.name, manager.runningTasks))
+ }
+ var launchTask = false
+ for (manager <- sortedTaskSetQueue) {
+ do {
+ launchTask = false
+ manager.slaveOffer(null, null, freeCpuCores) match {
+ case Some(task) =>
+ tasks += task
+ taskIdToTaskSetId(task.taskId) = manager.taskSet.id
+ taskSetTaskIds(manager.taskSet.id) += task.taskId
+ freeCpuCores -= 1
+ launchTask = true
+ case None => {}
- }
+ } while(launchTask)
+ return tasks
+ }
+ }
+ def taskSetFinished(manager: TaskSetManager) {
+ synchronized {
+ activeTaskSets -= manager.taskSet.id
+ manager.parent.removeSchedulable(manager)
+ logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
+ taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
+ taskSetTaskIds -= manager.taskSet.id
+ }
+ def runTask(taskId: Long, bytes: ByteBuffer) {
+ logInfo("Running " + taskId)
+ val info = new TaskInfo(taskId, 0, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
+ // Set the Spark execution environment for the worker thread
+ SparkEnv.set(env)
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val objectSer = SparkEnv.get.serializer.newInstance()
+ var attemptedTask: Option[Task[_]] = None
+ val start = System.currentTimeMillis()
+ var taskStart: Long = 0
+ try {
+ Accumulators.clear()
+ Thread.currentThread().setContextClassLoader(classLoader)
- for ((task, i) <- tasks.zipWithIndex) {
- submitTask(task, i)
+ // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
+ // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
+ val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
+ updateDependencies(taskFiles, taskJars) // Download any files added with addFile
+ val deserializedTask = ser.deserialize[Task[_]](
+ taskBytes, Thread.currentThread.getContextClassLoader)
+ attemptedTask = Some(deserializedTask)
+ val deserTime = System.currentTimeMillis() - start
+ taskStart = System.currentTimeMillis()
+ // Run it
+ val result: Any = deserializedTask.run(taskId)
+ // Serialize and deserialize the result to emulate what the Mesos
+ // executor does. This is useful to catch serialization errors early
+ // on in development (so when users move their local Spark programs
+ // to the cluster, they don't get surprised by serialization errors).
+ val serResult = objectSer.serialize(result)
+ deserializedTask.metrics.get.resultSize = serResult.limit()
+ val resultToReturn = objectSer.deserialize[Any](serResult)
+ val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
+ ser.serialize(Accumulators.values))
+ val serviceTime = System.currentTimeMillis() - taskStart
+ logInfo("Finished " + taskId)
+ deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
+ deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
+ val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
+ val serializedResult = ser.serialize(taskResult)
+ localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
+ } catch {
+ case t: Throwable => {
+ val serviceTime = System.currentTimeMillis() - taskStart
+ val metrics = attemptedTask.flatMap(t => t.metrics)
+ metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
+ val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
+ localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
+ }
@@ -119,6 +229,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
currentFiles(name) = timestamp
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
@@ -134,7 +245,16 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
- override def stop() {
+ def statusUpdate(taskId :Long, state: TaskState, serializedData: ByteBuffer) {
+ synchronized {
+ val taskSetId = taskIdToTaskSetId(taskId)
+ val taskSetManager = activeTaskSets(taskSetId)
+ taskSetTaskIds(taskSetId) -= taskId
+ taskSetManager.statusUpdate(taskId, state, serializedData)
+ }
+ }
+ override def stop() {