aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala11
1 files changed, 2 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5cdc91316b..c27aad268d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -950,13 +950,6 @@ class DAGScheduler(
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
- // Create internal accumulators if the stage has no accumulators initialized.
- // Reset internal accumulators only if this stage is not partially submitted
- // Otherwise, we may override existing accumulator values from some tasks
- if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
- stage.resetInternalAccumulators()
- }
-
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
@@ -1036,7 +1029,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.internalAccumulators)
+ taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
}
case stage: ResultStage =>
@@ -1046,7 +1039,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, stage.internalAccumulators)
+ taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
}
}
} catch {