aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala30
1 files changed, 21 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 7839023868..46f0ef2cc6 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -19,13 +19,25 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.actor._
+import akka.remote._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.DisassociatedEvent
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.AssociationErrorEvent
+import akka.remote.DisassociatedEvent
+import akka.actor.Terminated
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
private[spark] class StandaloneExecutorBackend(
@@ -40,14 +52,14 @@ private[spark] class StandaloneExecutorBackend(
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ // context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
@@ -69,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
}
@@ -90,8 +102,8 @@ private[spark] object StandaloneExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
- val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+ actorSystem.actorOf(
+ Props(classOf[StandaloneExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
actorSystem.awaitTermination()
}