aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala26
19 files changed, 127 insertions, 140 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 54e08d7866..e2d2250982 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
/**
* Classes that represent cleaning tasks.
@@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
/** Keep cleaning RDD, shuffle, and broadcast state. */
- private def keepCleaning() {
+ private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
@@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
- case t: Throwable => logError("Error in cleaning thread", t)
+ case e: Exception => logError("Error in cleaning thread", e)
}
}
}
@@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
- case t: Throwable => logError("Error cleaning RDD " + rddId, t)
+ case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}
@@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
- case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
+ case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}
@@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
} catch {
- case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
+ case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 71bab29544..e6121a7054 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
- case th: Throwable => {
- throw new SparkException("YARN mode not available ?", th)
+ case e: Exception => {
+ throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
@@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
- case th: Throwable => {
- throw new SparkException("YARN mode not available ?", th)
+ case e: Exception => {
+ throw new SparkException("YARN mode not available ?", e)
}
}
@@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
- case th: Throwable => {
- throw new SparkException("YARN mode not available ?", th)
+ case e: Exception => {
+ throw new SparkException("YARN mode not available ?", e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2971c277aa..57b28b9972 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -171,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
this.interrupt()
}
- override def run() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
@@ -282,7 +282,6 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
- case e: Throwable => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 002f2acd94..759cbe2c46 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
- case e: Throwable => throw e
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7ead117152..aeb159adc3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -157,7 +157,7 @@ object Client {
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
- "driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
+ "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e2df1b8954..148115d3ed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -103,7 +103,7 @@ object SparkHadoopUtil {
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
- case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+ case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 1238bbf9da..a9c11dca56 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -70,7 +70,7 @@ class HistoryServer(
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread {
- override def run() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
@@ -154,7 +154,7 @@ class HistoryServer(
numCompletedApplications = logInfos.size
} catch {
- case t: Throwable => logError("Exception in checking for event log updates", t)
+ case e: Exception => logError("Exception in checking for event log updates", e)
}
} else {
logWarning("Attempted to check for event log updates before binding the server.")
@@ -231,8 +231,8 @@ class HistoryServer(
dir.getModificationTime
}
} catch {
- case t: Throwable =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+ case e: Exception =>
+ logError("Exception in accessing modification time of %s".format(dir.getPath), e)
-1L
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f254f5585b..c6dec305bf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -684,8 +684,8 @@ private[spark] class Master(
webUi.attachSparkUI(ui)
return true
} catch {
- case t: Throwable =>
- logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
+ case e: Exception =>
+ logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
}
} else {
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index be15138f62..05e242e6df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -31,7 +31,7 @@ object DriverWrapper {
case workerUrl :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
- Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
+ Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
// Delegate to supplied main class
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e912ae8a5d..84aec65b77 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- indestructible = true, conf = conf, new SecurityManager(conf))
+ conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 98e7e0be81..baee7a216a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,28 +74,7 @@ private[spark] class Executor(
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
- Thread.setDefaultUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler {
- override def uncaughtException(thread: Thread, exception: Throwable) {
- try {
- logError("Uncaught exception in thread " + thread, exception)
-
- // We may have been called from a shutdown hook. If so, we must not call System.exit().
- // (If we do, we will deadlock.)
- if (!Utils.inShutdown()) {
- if (exception.isInstanceOf[OutOfMemoryError]) {
- System.exit(ExecutorExitCode.OOM)
- } else {
- System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
- }
- }
- } catch {
- case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
- case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
- }
- }
- }
- )
+ Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
}
val executorSource = new ExecutorSource(this, executorId)
@@ -259,6 +238,11 @@ private[spark] class Executor(
}
case t: Throwable => {
+ // Attempt to exit cleanly by informing the driver of our failure.
+ // If anything goes wrong (or this was a fatal exception), we will delegate to
+ // the default uncaught exception handler, which will terminate the Executor.
+ logError("Exception in task ID " + taskId, t)
+
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
@@ -268,10 +252,11 @@ private[spark] class Executor(
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
- // TODO: Should we exit the whole executor here? On the one hand, the failed task may
- // have left some weird state around depending on when the exception was thrown, but on
- // the other hand, maybe we could detect that when future tasks fail and exit then.
- logError("Exception in task ID " + taskId, t)
+ // Don't forcibly exit unless the exception was inherently fatal, to avoid
+ // stopping other tasks unnecessarily.
+ if (Utils.isFatalError(t)) {
+ ExecutorUncaughtExceptionHandler.uncaughtException(t)
+ }
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
new file mode 100644
index 0000000000..b0e984c039
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * The default uncaught exception handler for Executors terminates the whole process, to avoid
+ * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
+ * to fail fast when things go wrong.
+ */
+private[spark] object ExecutorUncaughtExceptionHandler
+ extends Thread.UncaughtExceptionHandler with Logging {
+
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+
+ // We may have been called from a shutdown hook. If so, we must not call System.exit().
+ // (If we do, we will deadlock.)
+ if (!Utils.inShutdown()) {
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+ }
+ }
+ } catch {
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
+ }
+ }
+
+ def uncaughtException(exception: Throwable) {
+ uncaughtException(Thread.currentThread(), exception)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 7968a0691d..a90b0d475c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -206,8 +206,8 @@ private[spark] object EventLoggingListener extends Logging {
applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
)
} catch {
- case t: Throwable =>
- logError("Exception in parsing logging info from directory %s".format(logDir), t)
+ case e: Exception =>
+ logError("Exception in parsing logging info from directory %s".format(logDir), e)
EventLoggingInfo.empty
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index c9ad2b151d..99d305b36a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -43,7 +43,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
- override def run() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
try {
val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] => directResult
@@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
- case ex: Throwable =>
+ case ex: Exception =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
@@ -81,7 +81,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
serializedData: ByteBuffer) {
var reason : TaskEndReason = UnknownReason
getTaskResultExecutor.execute(new Runnable {
- override def run() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
try {
if (serializedData != null && serializedData.limit() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
@@ -94,7 +94,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
val loader = Utils.getContextOrSparkClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex: Throwable => {}
+ case ex: Exception => {}
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index cf6ef0029a..3a7243a1ba 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -148,7 +148,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
- override def run() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.stop()
}
@@ -162,8 +162,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
- case t: Throwable =>
- logError("Exception while deleting local spark dir: " + localDir, t)
+ case e: Exception =>
+ logError("Exception while deleting local spark dir: " + localDir, e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index b0b9674856..a6cbe3aa44 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -25,7 +25,6 @@ import tachyon.client.TachyonFile
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.network.netty.ShuffleSender
import org.apache.spark.util.Utils
@@ -137,7 +136,7 @@ private[spark] class TachyonBlockManager(
private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
- override def run() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
@@ -145,8 +144,8 @@ private[spark] class TachyonBlockManager(
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
- case t: Throwable =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, t)
+ case e: Exception =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8afe09a117..a8d12bb2a0 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
-import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
@@ -41,7 +41,7 @@ private[spark] object AkkaUtils extends Logging {
* If indestructible is set to true, the Actor System will continue running in the event
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
+ def createActorSystem(name: String, host: String, port: Int,
conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
val akkaThreads = conf.getInt("spark.akka.threads", 4)
@@ -101,12 +101,7 @@ private[spark] object AkkaUtils extends Logging {
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin))
- val actorSystem = if (indestructible) {
- IndestructibleActorSystem(name, akkaConf)
- } else {
- ActorSystem(name, akkaConf)
- }
-
+ val actorSystem = ActorSystem(name, akkaConf)
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get
(actorSystem, boundPort)
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
deleted file mode 100644
index 4188a869c1..0000000000
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Must be in akka.actor package as ActorSystemImpl is protected[akka].
-package akka.actor
-
-import scala.util.control.{ControlThrowable, NonFatal}
-
-import com.typesafe.config.Config
-
-/**
- * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
- * This is necessary as Spark Executors are allowed to recover from fatal exceptions
- * (see org.apache.spark.executor.Executor)
- */
-object IndestructibleActorSystem {
- def apply(name: String, config: Config): ActorSystem =
- apply(name, config, ActorSystem.findClassLoader())
-
- def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
- new IndestructibleActorSystemImpl(name, config, classLoader).start()
-}
-
-private[akka] class IndestructibleActorSystemImpl(
- override val name: String,
- applicationConfig: Config,
- classLoader: ClassLoader)
- extends ActorSystemImpl(name, applicationConfig, classLoader) {
-
- protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
- val fallbackHandler = super.uncaughtExceptionHandler
-
- new Thread.UncaughtExceptionHandler() {
- def uncaughtException(thread: Thread, cause: Throwable): Unit = {
- if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
- log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
- "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
- // shutdown() //TODO make it configurable
- } else {
- fallbackHandler.uncaughtException(thread, cause)
- }
- }
- }
- }
-
- def isFatalError(e: Throwable): Boolean = {
- e match {
- case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
- false
- case _ =>
- true
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 95777fbf57..8f7594ada2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
+import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -41,7 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-
/**
* Various utility methods used by Spark.
*/
@@ -1125,4 +1125,28 @@ private[spark] object Utils extends Logging {
}
}
+ /**
+ * Executes the given block, printing and re-throwing any uncaught exceptions.
+ * This is particularly useful for wrapping code that runs in a thread, to ensure
+ * that exceptions are printed, and to avoid having to catch Throwable.
+ */
+ def logUncaughtExceptions[T](f: => T): T = {
+ try {
+ f
+ } catch {
+ case t: Throwable =>
+ logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
+ throw t
+ }
+ }
+
+ /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
+ def isFatalError(e: Throwable): Boolean = {
+ e match {
+ case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
+ false
+ case _ =>
+ true
+ }
+ }
}