aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-05 18:58:00 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-05 18:58:00 -0600
commit0e19093fd89ec9740f98cdcffd1ec09f4faf2490 (patch)
treec5039836e41b109624beca3ebf3eb48f182d7c6b /core
parentae26911ec0d768dcdae8b7d706ca4544e36535e6 (diff)
downloadspark-0e19093fd89ec9740f98cdcffd1ec09f4faf2490.tar.gz
spark-0e19093fd89ec9740f98cdcffd1ec09f4faf2490.tar.bz2
spark-0e19093fd89ec9740f98cdcffd1ec09f4faf2490.zip
Handle Terminated to avoid endless DeathPactExceptions.
Credit to Roland Kuhn, Akka's tech lead, for pointing out this various obvious fix, but StandaloneExecutorBackend.preStart's catch block would never (ever) get hit, because all of the operation's in preStart are async. So, the System.exit in the catch block was skipped, and instead Akka was sending Terminated messages which, since we didn't handle, it turned into DeathPactException, which started a postRestart/preStart infinite loop.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala25
2 files changed, 13 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 8b41620d98..48177a638a 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -1,19 +1,16 @@
package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import akka.actor.{ActorRef, Props, Actor}
+import akka.actor.{ActorRef, Props, Actor, Terminated}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
-import akka.remote.RemoteClientLifeCycleEvent
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker
import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
import java.io.File
private[spark] class Worker(
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index e45288ff53..224c126fdd 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -4,16 +4,15 @@ import java.nio.ByteBuffer
import spark.Logging
import spark.TaskState.TaskState
import spark.util.AkkaUtils
-import akka.actor.{ActorRef, Actor, Props}
+import akka.actor.{ActorRef, Actor, Props, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
-
private[spark] class StandaloneExecutorBackend(
executor: Executor,
driverUrl: String,
@@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend(
var driver: ActorRef = null
override def preStart() {
- try {
- logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
- driver ! RegisterExecutor(executorId, hostname, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver", e)
- System.exit(1)
- }
+ logInfo("Connecting to driver: " + driverUrl)
+ driver = context.actorFor(driverUrl)
+ driver ! RegisterExecutor(executorId, hostname, cores)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
@@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+
+ case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ logError("Driver terminated or disconnected! Shutting down.")
+ System.exit(1)
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {