aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-14 09:34:56 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-14 09:34:56 -0800
commitc64690d7252248df97bbe4b2bef8f540b977842d (patch)
tree43d78007f7521709fa8008eb3466d8422b2c034d /core/src/main/scala/org/apache
parent46f9c6b858cf9737b7d46b22b75bfc847244331b (diff)
downloadspark-c64690d7252248df97bbe4b2bef8f540b977842d.tar.gz
spark-c64690d7252248df97bbe4b2bef8f540b977842d.tar.bz2
spark-c64690d7252248df97bbe4b2bef8f540b977842d.zip
Changed local backend to use Akka actor
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala80
1 files changed, 57 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 3e9d31cd5e..d9b941d694 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -21,21 +21,26 @@ import java.nio.ByteBuffer
import akka.actor.{Actor, ActorRef, Props}
-import org.apache.spark.{SparkContext, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer}
+private case class ReviveOffers()
+
+private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+
+private case class KillTask(taskId: Long)
+
/**
- * LocalBackend is used when running a local version of Spark where the executor, backend, and
- * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
- * on a single Executor (created by the LocalBackend) running locally.
- *
- * THREADING: Because methods can be called both from the Executor and the TaskScheduler, and
- * because the Executor class is not thread safe, all methods are synchronized.
+ * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
+ * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
+ * and the ClusterScheduler.
*/
-private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int)
- extends SchedulerBackend with ExecutorBackend {
+private[spark] class LocalActor(
+ scheduler: ClusterScheduler,
+ executorBackend: LocalBackend,
+ private val totalCores: Int) extends Actor with Logging {
private var freeCores = totalCores
@@ -44,31 +49,60 @@ private[spark] class LocalBackend(scheduler: ClusterScheduler, private val total
val executor = new Executor(localExecutorId, localExecutorHostname, Seq.empty, isLocal = true)
- override def start() {
- }
+ def receive = {
+ case ReviveOffers =>
+ reviveOffers()
- override def stop() {
+ case StatusUpdate(taskId, state, serializedData) =>
+ scheduler.statusUpdate(taskId, state, serializedData)
+ if (TaskState.isFinished(state)) {
+ freeCores += 1
+ reviveOffers()
+ }
+
+ case KillTask(taskId) =>
+ executor.killTask(taskId)
}
- override def reviveOffers() = synchronized {
- val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
+ def reviveOffers() {
+ val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= 1
- executor.launchTask(this, task.taskId, task.serializedTask)
+ executor.launchTask(executorBackend, task.taskId, task.serializedTask)
}
}
+}
+
+/**
+ * LocalBackend is used when running a local version of Spark where the executor, backend, and
+ * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
+ * on a single Executor (created by the LocalBackend) running locally.
+ */
+private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int)
+ extends SchedulerBackend with ExecutorBackend {
+
+ var localActor: ActorRef = null
+
+ override def start() {
+ localActor = SparkEnv.get.actorSystem.actorOf(
+ Props(new LocalActor(scheduler, this, totalCores)),
+ "LocalBackendActor")
+ }
+
+ override def stop() {
+ }
+
+ override def reviveOffers() {
+ localActor ! ReviveOffers
+ }
override def defaultParallelism() = totalCores
- override def killTask(taskId: Long, executorId: String) = synchronized {
- executor.killTask(taskId)
+ override def killTask(taskId: Long, executorId: String) {
+ localActor ! KillTask(taskId)
}
- override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) = synchronized {
- scheduler.statusUpdate(taskId, state, serializedData)
- if (TaskState.isFinished(state)) {
- freeCores += 1
- reviveOffers()
- }
+ override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
+ localActor ! StatusUpdate(taskId, state, serializedData)
}
}