aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/Executor.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/Executor.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala33
1 files changed, 28 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 975a6e4eeb..790c1ae942 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -18,6 +18,7 @@
package org.apache.spark.executor
import java.io.{File, NotSerializableException}
+import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
@@ -52,7 +53,8 @@ private[spark] class Executor(
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
- isLocal: Boolean = false)
+ isLocal: Boolean = false,
+ uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
extends Logging {
logInfo(s"Starting executor ID $executorId on host $executorHostname")
@@ -78,7 +80,7 @@ private[spark] class Executor(
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
- Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
+ Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler)
}
// Start worker thread pool
@@ -342,6 +344,14 @@ private[spark] class Executor(
}
}
}
+ task.context.fetchFailed.foreach { fetchFailure =>
+ // uh-oh. it appears the user code has caught the fetch-failure without throwing any
+ // other exceptions. Its *possible* this is what the user meant to do (though highly
+ // unlikely). So we will log an error and keep going.
+ logError(s"TID ${taskId} completed successfully though internally it encountered " +
+ s"unrecoverable fetch failures! Most likely this means user code is incorrectly " +
+ s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
+ }
val taskFinish = System.currentTimeMillis()
val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
@@ -402,8 +412,17 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
- case ffe: FetchFailedException =>
- val reason = ffe.toTaskFailedReason
+ case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
+ val reason = task.context.fetchFailed.get.toTaskFailedReason
+ if (!t.isInstanceOf[FetchFailedException]) {
+ // there was a fetch failure in the task, but some user code wrapped that exception
+ // and threw something else. Regardless, we treat it as a fetch failure.
+ val fetchFailedCls = classOf[FetchFailedException].getName
+ logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " +
+ s"failed, but the ${fetchFailedCls} was hidden by another " +
+ s"exception. Spark is handling this like a fetch failure and ignoring the " +
+ s"other exception: $t")
+ }
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
@@ -455,13 +474,17 @@ private[spark] class Executor(
// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) {
- SparkUncaughtExceptionHandler.uncaughtException(t)
+ uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}
} finally {
runningTasks.remove(taskId)
}
}
+
+ private def hasFetchFailure: Boolean = {
+ task != null && task.context != null && task.context.fetchFailed.isDefined
+ }
}
/**