aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
6 files changed, 19 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 7aa9057858..0dd4ec656f 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -187,7 +187,7 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
- def create(sc: SparkContext): Seq[Accumulator[_]] = {
+ def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4609b244e6..c27aad268d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -950,13 +950,6 @@ class DAGScheduler(
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
- // Create internal accumulators if the stage has no accumulators initialized.
- // Reset internal accumulators only if this stage is not partially submitted
- // Otherwise, we may override existing accumulator values from some tasks
- if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
- stage.resetInternalAccumulators()
- }
-
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
@@ -1036,7 +1029,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.internalAccumulators, properties)
+ taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
}
case stage: ResultStage =>
@@ -1046,7 +1039,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.internalAccumulators)
+ taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a40b700cdd..b6d4e39fe5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -75,22 +75,6 @@ private[scheduler] abstract class Stage(
val name: String = callSite.shortForm
val details: String = callSite.longForm
- private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty
-
- /** Internal accumulators shared across all tasks in this stage. */
- def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators
-
- /**
- * Re-initialize the internal accumulators associated with this stage.
- *
- * This is called every time the stage is submitted, *except* when a subset of tasks
- * belonging to this stage has already finished. Otherwise, reinitializing the internal
- * accumulators here again will override partial values from the finished tasks.
- */
- def resetInternalAccumulators(): Unit = {
- _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
- }
-
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
@@ -127,7 +111,8 @@ private[scheduler] abstract class Stage(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
_latestInfo = StageInfo.fromStage(
- this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
+ this, nextAttemptId, Some(numPartitionsToCompute),
+ InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences)
nextAttemptId += 1
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 24796c1430..0fd58c41cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashMap
+import org.apache.spark.Accumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo
@@ -35,6 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
+ val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -42,7 +44,11 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
- /** Terminal values of accumulables updated during this stage. */
+
+ /**
+ * Terminal values of accumulables updated during this stage, including all the user-defined
+ * accumulators.
+ */
val accumulables = HashMap[Long, AccumulableInfo]()
def stageFailed(reason: String) {
@@ -75,6 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
+ internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
@@ -87,6 +94,7 @@ private[spark] object StageInfo {
rddInfos,
stage.parents.map(_.id),
stage.details,
+ internalAccumulators,
taskLocalityPreferences)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 645e2d2e36..bd4797ae8e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
- new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
+ new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
}
val activeStages = Buffer[StageInfo]()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 09d955300a..3b78458065 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -578,7 +578,9 @@ private[spark] object JsonProtocol {
// The "Stage Infos" field was added in Spark 1.2.0
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
- stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
+ stageIds.map { id =>
+ new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty)
+ }
}
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}
@@ -686,7 +688,7 @@ private[spark] object JsonProtocol {
}
val stageInfo = new StageInfo(
- stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
+ stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason