aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2017-02-10 22:34:57 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-10 22:34:57 -0800
commit0fbecc736df95bf757cb497c108ae3dbc5893829 (patch)
tree52e3a1dfd7dc8db8642113564e96afff947478ca /core/src/main/scala
parent226d38840c8d3f40639715d755df6fb4fee2715f (diff)
downloadspark-0fbecc736df95bf757cb497c108ae3dbc5893829.tar.gz
spark-0fbecc736df95bf757cb497c108ae3dbc5893829.tar.bz2
spark-0fbecc736df95bf757cb497c108ae3dbc5893829.zip
[SPARK-19537] Move pendingPartitions to ShuffleMapStage.
The pendingPartitions instance variable should be moved to ShuffleMapStage, because it is only used by ShuffleMapStages. This change is purely refactoring and does not change functionality. I fixed this in an attempt to clarify some of the discussion around #16620, which I was having trouble reasoning about. I stole the helpful comment Imran wrote for pendingPartitions and used it here. cc squito markhamstra jinxing64 Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16876 from kayousterhout/SPARK-19537.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
3 files changed, 26 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b9d7e1328d..69101acb3a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -932,8 +932,6 @@ class DAGScheduler(
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
- // Get our pending tasks and remember them in our pendingTasks entry
- stage.pendingPartitions.clear()
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
@@ -1013,9 +1011,11 @@ class DAGScheduler(
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
+ stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
+ stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
@@ -1039,9 +1039,8 @@ class DAGScheduler(
}
if (tasks.size > 0) {
- logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
- stage.pendingPartitions ++= tasks.map(_.partitionId)
- logDebug("New pending partitions: " + stage.pendingPartitions)
+ logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
+ s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
@@ -1147,7 +1146,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
- stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
@@ -1183,6 +1181,7 @@ class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
+ shuffleStage.pendingPartitions -= task.partitionId
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
@@ -1235,7 +1234,14 @@ class DAGScheduler(
case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
- stage.pendingPartitions += task.partitionId
+ stage match {
+ case sms: ShuffleMapStage =>
+ sms.pendingPartitions += task.partitionId
+
+ case _ =>
+ assert(false, "TaskSetManagers should only send Resubmitted task statuses for " +
+ "tasks in ShuffleMapStages.")
+ }
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index 51416e5ce9..db4d9efa22 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import scala.collection.mutable.HashSet
+
import org.apache.spark.ShuffleDependency
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.BlockManagerId
@@ -48,6 +50,17 @@ private[spark] class ShuffleMapStage(
private[this] var _numAvailableOutputs: Int = 0
/**
+ * Partitions that either haven't yet been computed, or that were computed on an executor
+ * that has since been lost, so should be re-computed. This variable is used by the
+ * DAGScheduler to determine when a stage has completed. Task successes in both the active
+ * attempt for the stage or in earlier attempts for this stage can cause paritition ids to get
+ * removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
+ * tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
+ * will always be a subset of the partitions that the TaskSetManager thinks are pending).
+ */
+ val pendingPartitions = new HashSet[Int]
+
+ /**
* List of [[MapStatus]] for each partition. The index of the array is the map partition id,
* and each value in the array is the list of possible [[MapStatus]] for a partition
* (a single task might run multiple times).
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c628dd38d0..c6fc038129 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -67,8 +67,6 @@ private[scheduler] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]
- val pendingPartitions = new HashSet[Int]
-
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0