aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
9 files changed, 25 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 7aa9057858..0dd4ec656f 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -187,7 +187,7 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
- def create(sc: SparkContext): Seq[Accumulator[_]] = {
+ def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4609b244e6..c27aad268d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -950,13 +950,6 @@ class DAGScheduler(
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
- // Create internal accumulators if the stage has no accumulators initialized.
- // Reset internal accumulators only if this stage is not partially submitted
- // Otherwise, we may override existing accumulator values from some tasks
- if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
- stage.resetInternalAccumulators()
- }
-
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
@@ -1036,7 +1029,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.internalAccumulators, properties)
+ taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
}
case stage: ResultStage =>
@@ -1046,7 +1039,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.internalAccumulators)
+ taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a40b700cdd..b6d4e39fe5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -75,22 +75,6 @@ private[scheduler] abstract class Stage(
val name: String = callSite.shortForm
val details: String = callSite.longForm
- private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty
-
- /** Internal accumulators shared across all tasks in this stage. */
- def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators
-
- /**
- * Re-initialize the internal accumulators associated with this stage.
- *
- * This is called every time the stage is submitted, *except* when a subset of tasks
- * belonging to this stage has already finished. Otherwise, reinitializing the internal
- * accumulators here again will override partial values from the finished tasks.
- */
- def resetInternalAccumulators(): Unit = {
- _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
- }
-
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
@@ -127,7 +111,8 @@ private[scheduler] abstract class Stage(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
_latestInfo = StageInfo.fromStage(
- this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
+ this, nextAttemptId, Some(numPartitionsToCompute),
+ InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences)
nextAttemptId += 1
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 24796c1430..0fd58c41cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashMap
+import org.apache.spark.Accumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo
@@ -35,6 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
+ val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -42,7 +44,11 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
- /** Terminal values of accumulables updated during this stage. */
+
+ /**
+ * Terminal values of accumulables updated during this stage, including all the user-defined
+ * accumulators.
+ */
val accumulables = HashMap[Long, AccumulableInfo]()
def stageFailed(reason: String) {
@@ -75,6 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
+ internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
@@ -87,6 +94,7 @@ private[spark] object StageInfo {
rddInfos,
stage.parents.map(_.id),
stage.details,
+ internalAccumulators,
taskLocalityPreferences)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 645e2d2e36..bd4797ae8e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
- new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
+ new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
}
val activeStages = Buffer[StageInfo]()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 09d955300a..3b78458065 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -578,7 +578,9 @@ private[spark] object JsonProtocol {
// The "Stage Infos" field was added in Spark 1.2.0
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
- stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
+ stageIds.map { id =>
+ new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty)
+ }
}
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}
@@ -686,7 +688,7 @@ private[spark] object JsonProtocol {
}
val stageInfo = new StageInfo(
- stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
+ stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 80a1de6065..ee6b991461 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -928,8 +928,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
numTasks: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
- new StageInfo(
- stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences)
+ new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details",
+ Seq.empty, taskLocalityPreferences)
}
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 00f3f15c45..cd7d2e1570 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -337,7 +337,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.create(sc)))
+ InternalAccumulator.createAll(sc)))
val data1 = (1 to 10).map { x => x -> x}
// second attempt -- also successful. We'll write out different data,
@@ -345,7 +345,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// depending on what gets spilled, what gets combined, etc.
val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.create(sc)))
+ InternalAccumulator.createAll(sc)))
val data2 = (11 to 20).map { x => x -> x}
// interleave writes of both attempts -- we want to test that both attempts can occur
@@ -374,7 +374,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.create(sc)))
+ InternalAccumulator.createAll(sc)))
val readData = reader.read().toIndexedSeq
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2293c11dad..fd96fb04f8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
- assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+ assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))