diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-05 18:27:54 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-05 18:27:54 -0800 |
commit | 03eefbb2005e9e88fd79eecef3fc612e9f2ee623 (patch) | |
tree | a0572102d2545a2189334ecd7425e65273332711 /core | |
parent | a4611d66f0d2ebc4425f385988d541b8f930e505 (diff) | |
parent | 870b2aaf5d1398704f69a5b1a8be30de522b284c (diff) | |
download | spark-03eefbb2005e9e88fd79eecef3fc612e9f2ee623.tar.gz spark-03eefbb2005e9e88fd79eecef3fc612e9f2ee623.tar.bz2 spark-03eefbb2005e9e88fd79eecef3fc612e9f2ee623.zip |
Merge pull request #451 from stephenh/fixdeathpactexception
Handle Terminated to avoid endless DeathPactExceptions.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/Worker.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala | 25 |
2 files changed, 13 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 2219dd6262..38547ec4f1 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -1,20 +1,17 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} -import akka.actor.{ActorRef, Props, Actor, ActorSystem} +import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ -import akka.remote.RemoteClientLifeCycleEvent +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import java.text.SimpleDateFormat import java.util.Date -import akka.remote.RemoteClientShutdown -import akka.remote.RemoteClientDisconnected import spark.deploy.RegisterWorker import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed import spark.deploy.master.Master -import akka.actor.Terminated import java.io.File private[spark] class Worker( diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index e45288ff53..224c126fdd 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -4,16 +4,15 @@ import java.nio.ByteBuffer import spark.Logging import spark.TaskState.TaskState import spark.util.AkkaUtils -import akka.actor.{ActorRef, Actor, Props} +import akka.actor.{ActorRef, Actor, Props, Terminated} +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} -import akka.remote.RemoteClientLifeCycleEvent import spark.scheduler.cluster._ import spark.scheduler.cluster.RegisteredExecutor import spark.scheduler.cluster.LaunchTask import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutor - private[spark] class StandaloneExecutorBackend( executor: Executor, driverUrl: String, @@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend( var driver: ActorRef = null override def preStart() { - try { - logInfo("Connecting to driver: " + driverUrl) - driver = context.actorFor(driverUrl) - driver ! RegisterExecutor(executorId, hostname, cores) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to driver", e) - System.exit(1) - } + logInfo("Connecting to driver: " + driverUrl) + driver = context.actorFor(driverUrl) + driver ! RegisterExecutor(executorId, hostname, cores) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(driver) // Doesn't work with remote actors, but useful for testing } override def receive = { @@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend( case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + logError("Driver terminated or disconnected! Shutting down.") + System.exit(1) } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { |