diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/Stage.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/Stage.scala | 19 |
1 files changed, 2 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a40b700cdd..b6d4e39fe5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -75,22 +75,6 @@ private[scheduler] abstract class Stage( val name: String = callSite.shortForm val details: String = callSite.longForm - private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty - - /** Internal accumulators shared across all tasks in this stage. */ - def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators - - /** - * Re-initialize the internal accumulators associated with this stage. - * - * This is called every time the stage is submitted, *except* when a subset of tasks - * belonging to this stage has already finished. Otherwise, reinitializing the internal - * accumulators here again will override partial values from the finished tasks. - */ - def resetInternalAccumulators(): Unit = { - _internalAccumulators = InternalAccumulator.create(rdd.sparkContext) - } - /** * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this @@ -127,7 +111,8 @@ private[scheduler] abstract class Stage( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { _latestInfo = StageInfo.fromStage( - this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences) + this, nextAttemptId, Some(numPartitionsToCompute), + InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences) nextAttemptId += 1 } |