aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/TaskEndReason.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/TaskEndReason.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala29
1 files changed, 22 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 13241b77bf..68340cc704 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -19,8 +19,11 @@ package org.apache.spark
import java.io.{ObjectInputStream, ObjectOutputStream}
+import scala.util.Try
+
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
@@ -115,22 +118,34 @@ case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
- metrics: Option[TaskMetrics],
- private val exceptionWrapper: Option[ThrowableSerializationWrapper])
+ exceptionWrapper: Option[ThrowableSerializationWrapper],
+ accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
extends TaskFailedReason {
+ @deprecated("use accumUpdates instead", "2.0.0")
+ val metrics: Option[TaskMetrics] = {
+ if (accumUpdates.nonEmpty) {
+ Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption
+ } else {
+ None
+ }
+ }
+
/**
* `preserveCause` is used to keep the exception itself so it is available to the
* driver. This may be set to `false` in the event that the exception is not in fact
* serializable.
*/
- private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], preserveCause: Boolean) {
- this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics,
- if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None)
+ private[spark] def this(
+ e: Throwable,
+ accumUpdates: Seq[AccumulableInfo],
+ preserveCause: Boolean) {
+ this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e),
+ if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates)
}
- private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
- this(e, metrics, preserveCause = true)
+ private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) {
+ this(e, accumUpdates, preserveCause = true)
}
def exception: Option[Throwable] = exceptionWrapper.flatMap {