From c64690d7252248df97bbe4b2bef8f540b977842d Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 14 Nov 2013 09:34:56 -0800 Subject: Changed local backend to use Akka actor --- .../spark/scheduler/local/LocalBackend.scala | 80 +++++++++++++++------- 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 3e9d31cd5e..d9b941d694 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -21,21 +21,26 @@ import java.nio.ByteBuffer import akka.actor.{Actor, ActorRef, Props} -import org.apache.spark.{SparkContext, SparkEnv, TaskState} +import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer} +private case class ReviveOffers() + +private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) + +private case class KillTask(taskId: Long) + /** - * LocalBackend is used when running a local version of Spark where the executor, backend, and - * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks - * on a single Executor (created by the LocalBackend) running locally. - * - * THREADING: Because methods can be called both from the Executor and the TaskScheduler, and - * because the Executor class is not thread safe, all methods are synchronized. + * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on + * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend + * and the ClusterScheduler. */ -private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int) - extends SchedulerBackend with ExecutorBackend { +private[spark] class LocalActor( + scheduler: ClusterScheduler, + executorBackend: LocalBackend, + private val totalCores: Int) extends Actor with Logging { private var freeCores = totalCores @@ -44,31 +49,60 @@ private[spark] class LocalBackend(scheduler: ClusterScheduler, private val total val executor = new Executor(localExecutorId, localExecutorHostname, Seq.empty, isLocal = true) - override def start() { - } + def receive = { + case ReviveOffers => + reviveOffers() - override def stop() { + case StatusUpdate(taskId, state, serializedData) => + scheduler.statusUpdate(taskId, state, serializedData) + if (TaskState.isFinished(state)) { + freeCores += 1 + reviveOffers() + } + + case KillTask(taskId) => + executor.killTask(taskId) } - override def reviveOffers() = synchronized { - val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) + def reviveOffers() { + val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= 1 - executor.launchTask(this, task.taskId, task.serializedTask) + executor.launchTask(executorBackend, task.taskId, task.serializedTask) } } +} + +/** + * LocalBackend is used when running a local version of Spark where the executor, backend, and + * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks + * on a single Executor (created by the LocalBackend) running locally. + */ +private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int) + extends SchedulerBackend with ExecutorBackend { + + var localActor: ActorRef = null + + override def start() { + localActor = SparkEnv.get.actorSystem.actorOf( + Props(new LocalActor(scheduler, this, totalCores)), + "LocalBackendActor") + } + + override def stop() { + } + + override def reviveOffers() { + localActor ! ReviveOffers + } override def defaultParallelism() = totalCores - override def killTask(taskId: Long, executorId: String) = synchronized { - executor.killTask(taskId) + override def killTask(taskId: Long, executorId: String) { + localActor ! KillTask(taskId) } - override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) = synchronized { - scheduler.statusUpdate(taskId, state, serializedData) - if (TaskState.isFinished(state)) { - freeCores += 1 - reviveOffers() - } + override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { + localActor ! StatusUpdate(taskId, state, serializedData) } } -- cgit v1.2.3