aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala18
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala13
5 files changed, 36 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 896913d796..d38e9e7920 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -88,13 +88,15 @@ private[spark] class AppClient(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
- retries += 1
- if (registered) {
- registrationRetryTimer.foreach(_.cancel())
- } else if (retries >= REGISTRATION_RETRIES) {
- markDead("All masters are unresponsive! Giving up.")
- } else {
- tryRegisterAllMasters()
+ Utils.tryOrExit {
+ retries += 1
+ if (registered) {
+ registrationRetryTimer.foreach(_.cancel())
+ } else if (retries >= REGISTRATION_RETRIES) {
+ markDead("All masters are unresponsive! Giving up.")
+ } else {
+ tryRegisterAllMasters()
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 85d25dc7db..134624c35a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -166,14 +166,16 @@ private[spark] class Worker(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
- retries += 1
- if (registered) {
- registrationRetryTimer.foreach(_.cancel())
- } else if (retries >= REGISTRATION_RETRIES) {
- logError("All masters are unresponsive! Giving up.")
- System.exit(1)
- } else {
- tryRegisterAllMasters()
+ Utils.tryOrExit {
+ retries += 1
+ if (registered) {
+ registrationRetryTimer.foreach(_.cancel())
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ System.exit(1)
+ } else {
+ tryRegisterAllMasters()
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ffd1d94326..649eed213e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -31,6 +31,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.util.Utils
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
@@ -139,7 +140,7 @@ private[spark] class TaskSchedulerImpl(
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
- checkSpeculatableTasks()
+ Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6d7d4f922e..6534095811 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -155,7 +155,7 @@ private[spark] class BlockManager(
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
- heartBeat()
+ Utils.tryOrExit { heartBeat() }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 99ef6dd1fa..d041bfa66a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -40,6 +40,7 @@ import tachyon.client.{TachyonFile,TachyonFS}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/**
@@ -781,6 +782,18 @@ private[spark] object Utils extends Logging {
}
/**
+ * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
+ * default UncaughtExceptionHandler
+ */
+ def tryOrExit(block: => Unit) {
+ try {
+ block
+ } catch {
+ case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
+ }
+ }
+
+ /**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/