diff options
10 files changed, 26 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 7aa9057858..0dd4ec656f 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -187,7 +187,7 @@ private[spark] object InternalAccumulator { * add to the same set of accumulators. We do this to report the distribution of accumulator * values across all tasks within each stage. */ - def create(sc: SparkContext): Seq[Accumulator[_]] = { + def createAll(sc: SparkContext): Seq[Accumulator[_]] = { val accums = createAll() accums.foreach { accum => Accumulators.register(accum) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4609b244e6..c27aad268d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -950,13 +950,6 @@ class DAGScheduler( // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() - // Create internal accumulators if the stage has no accumulators initialized. - // Reset internal accumulators only if this stage is not partially submitted - // Otherwise, we may override existing accumulator values from some tasks - if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) { - stage.resetInternalAccumulators() - } - // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties @@ -1036,7 +1029,7 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.internalAccumulators, properties) + taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties) } case stage: ResultStage => @@ -1046,7 +1039,7 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.internalAccumulators) + taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a40b700cdd..b6d4e39fe5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -75,22 +75,6 @@ private[scheduler] abstract class Stage( val name: String = callSite.shortForm val details: String = callSite.longForm - private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty - - /** Internal accumulators shared across all tasks in this stage. */ - def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators - - /** - * Re-initialize the internal accumulators associated with this stage. - * - * This is called every time the stage is submitted, *except* when a subset of tasks - * belonging to this stage has already finished. Otherwise, reinitializing the internal - * accumulators here again will override partial values from the finished tasks. - */ - def resetInternalAccumulators(): Unit = { - _internalAccumulators = InternalAccumulator.create(rdd.sparkContext) - } - /** * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this @@ -127,7 +111,8 @@ private[scheduler] abstract class Stage( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { _latestInfo = StageInfo.fromStage( - this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences) + this, nextAttemptId, Some(numPartitionsToCompute), + InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences) nextAttemptId += 1 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 24796c1430..0fd58c41cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import scala.collection.mutable.HashMap +import org.apache.spark.Accumulator import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.RDDInfo @@ -35,6 +36,7 @@ class StageInfo( val rddInfos: Seq[RDDInfo], val parentIds: Seq[Int], val details: String, + val internalAccumulators: Seq[Accumulator[_]] = Seq.empty, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -42,7 +44,11 @@ class StageInfo( var completionTime: Option[Long] = None /** If the stage failed, the reason why. */ var failureReason: Option[String] = None - /** Terminal values of accumulables updated during this stage. */ + + /** + * Terminal values of accumulables updated during this stage, including all the user-defined + * accumulators. + */ val accumulables = HashMap[Long, AccumulableInfo]() def stageFailed(reason: String) { @@ -75,6 +81,7 @@ private[spark] object StageInfo { stage: Stage, attemptId: Int, numTasks: Option[Int] = None, + internalAccumulators: Seq[Accumulator[_]] = Seq.empty, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) @@ -87,6 +94,7 @@ private[spark] object StageInfo { rddInfos, stage.parents.map(_.id), stage.details, + internalAccumulators, taskLocalityPreferences) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 645e2d2e36..bd4797ae8e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { // This could be empty if the JobProgressListener hasn't received information about the // stage or if the stage information has been garbage collected listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) + new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty)) } val activeStages = Buffer[StageInfo]() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 09d955300a..3b78458065 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -578,7 +578,9 @@ private[spark] object JsonProtocol { // The "Stage Infos" field was added in Spark 1.2.0 val stageInfos = Utils.jsonOption(json \ "Stage Infos") .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { - stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")) + stageIds.map { id => + new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty) + } } SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) } @@ -686,7 +688,7 @@ private[spark] object JsonProtocol { } val stageInfo = new StageInfo( - stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) + stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 80a1de6065..ee6b991461 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -928,8 +928,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { numTasks: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { - new StageInfo( - stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences) + new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", + Seq.empty, taskLocalityPreferences) } private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 00f3f15c45..cd7d2e1570 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -337,7 +337,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // first attempt -- its successful val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0, new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.create(sc))) + InternalAccumulator.createAll(sc))) val data1 = (1 to 10).map { x => x -> x} // second attempt -- also successful. We'll write out different data, @@ -345,7 +345,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // depending on what gets spilled, what gets combined, etc. val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0, new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.create(sc))) + InternalAccumulator.createAll(sc))) val data2 = (11 to 20).map { x => x -> x} // interleave writes of both attempts -- we want to test that both attempts can occur @@ -374,7 +374,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.create(sc))) + InternalAccumulator.createAll(sc))) val readData = reader.read().toIndexedSeq assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2293c11dad..fd96fb04f8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // SPARK-9809 -- this stage is submitted without a task for each partition (because some of // the shuffle map output is still available from stage 0); make sure we've still got internal // accumulators setup - assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) + assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty) completeShuffleMapStageSuccessfully(2, 0, 2) completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) assert(results === Map(0 -> 1234, 1 -> 1235)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 7db1f9654b..01687877ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -114,7 +114,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { } val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0) val taskContext = new TaskContextImpl( - 0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.create(sc)) + 0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.createAll(sc)) val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow]( taskContext, |