aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/TaskContextImpl.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala11
1 files changed, 11 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index c904e08391..dc0d128785 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util._
private[spark] class TaskContextImpl(
@@ -56,6 +57,10 @@ private[spark] class TaskContextImpl(
// Whether the task has failed.
@volatile private var failed: Boolean = false
+ // If there was a fetch failure in the task, we store it here, to make sure user-code doesn't
+ // hide the exception. See SPARK-19276
+ @volatile private var _fetchFailedException: Option[FetchFailedException] = None
+
override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
this
@@ -126,4 +131,10 @@ private[spark] class TaskContextImpl(
taskMetrics.registerAccumulator(a)
}
+ private[spark] override def setFetchFailed(fetchFailed: FetchFailedException): Unit = {
+ this._fetchFailedException = Option(fetchFailed)
+ }
+
+ private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException
+
}