aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala10
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 24796c1430..0fd58c41cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashMap
+import org.apache.spark.Accumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo
@@ -35,6 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
+ val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -42,7 +44,11 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
- /** Terminal values of accumulables updated during this stage. */
+
+ /**
+ * Terminal values of accumulables updated during this stage, including all the user-defined
+ * accumulators.
+ */
val accumulables = HashMap[Long, AccumulableInfo]()
def stageFailed(reason: String) {
@@ -75,6 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
+ internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
@@ -87,6 +94,7 @@ private[spark] object StageInfo {
rddInfos,
stage.parents.map(_.id),
stage.details,
+ internalAccumulators,
taskLocalityPreferences)
}
}