aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-05 18:27:54 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-05 18:27:54 -0800
commit03eefbb2005e9e88fd79eecef3fc612e9f2ee623 (patch)
treea0572102d2545a2189334ecd7425e65273332711 /core
parenta4611d66f0d2ebc4425f385988d541b8f930e505 (diff)
parent870b2aaf5d1398704f69a5b1a8be30de522b284c (diff)
downloadspark-03eefbb2005e9e88fd79eecef3fc612e9f2ee623.tar.gz
spark-03eefbb2005e9e88fd79eecef3fc612e9f2ee623.tar.bz2
spark-03eefbb2005e9e88fd79eecef3fc612e9f2ee623.zip
Merge pull request #451 from stephenh/fixdeathpactexception
Handle Terminated to avoid endless DeathPactExceptions.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala25
2 files changed, 13 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 2219dd6262..38547ec4f1 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -1,20 +1,17 @@
package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import akka.actor.{ActorRef, Props, Actor, ActorSystem}
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
-import akka.remote.RemoteClientLifeCycleEvent
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker
import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master
-import akka.actor.Terminated
import java.io.File
private[spark] class Worker(
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index e45288ff53..224c126fdd 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -4,16 +4,15 @@ import java.nio.ByteBuffer
import spark.Logging
import spark.TaskState.TaskState
import spark.util.AkkaUtils
-import akka.actor.{ActorRef, Actor, Props}
+import akka.actor.{ActorRef, Actor, Props, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
-
private[spark] class StandaloneExecutorBackend(
executor: Executor,
driverUrl: String,
@@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend(
var driver: ActorRef = null
override def preStart() {
- try {
- logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
- driver ! RegisterExecutor(executorId, hostname, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver", e)
- System.exit(1)
- }
+ logInfo("Connecting to driver: " + driverUrl)
+ driver = context.actorFor(driverUrl)
+ driver ! RegisterExecutor(executorId, hostname, cores)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
@@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+
+ case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ logError("Driver terminated or disconnected! Shutting down.")
+ System.exit(1)
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {