diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/TaskEndReason.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/TaskEndReason.scala | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 13241b77bf..68340cc704 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -19,8 +19,11 @@ package org.apache.spark import java.io.{ObjectInputStream, ObjectOutputStream} +import scala.util.Try + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -115,22 +118,34 @@ case class ExceptionFailure( description: String, stackTrace: Array[StackTraceElement], fullStackTrace: String, - metrics: Option[TaskMetrics], - private val exceptionWrapper: Option[ThrowableSerializationWrapper]) + exceptionWrapper: Option[ThrowableSerializationWrapper], + accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]) extends TaskFailedReason { + @deprecated("use accumUpdates instead", "2.0.0") + val metrics: Option[TaskMetrics] = { + if (accumUpdates.nonEmpty) { + Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption + } else { + None + } + } + /** * `preserveCause` is used to keep the exception itself so it is available to the * driver. This may be set to `false` in the event that the exception is not in fact * serializable. */ - private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], preserveCause: Boolean) { - this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics, - if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None) + private[spark] def this( + e: Throwable, + accumUpdates: Seq[AccumulableInfo], + preserveCause: Boolean) { + this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), + if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates) } - private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) { - this(e, metrics, preserveCause = true) + private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) { + this(e, accumUpdates, preserveCause = true) } def exception: Option[Throwable] = exceptionWrapper.flatMap { |