aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-07-12 20:45:09 -0400
committerKay Ousterhout <kayousterhout@gmail.com>2015-07-12 20:45:24 -0400
commit30090884f96efde72d9653f2253070b68f87782c (patch)
tree5a377c6a4992a89019c48d3c142b7f5a86106528 /core
parentc472eb17ae7f0910f304e414ea5ccbb77a9e153a (diff)
downloadspark-30090884f96efde72d9653f2253070b68f87782c.tar.gz
spark-30090884f96efde72d9653f2253070b68f87782c.tar.bz2
spark-30090884f96efde72d9653f2253070b68f87782c.zip
[SPARK-8880] Fix confusing Stage.attemptId member variable
Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #7275 from kayousterhout/SPARK-8880 and squashes the following commits: 3e9ce7c [Kay Ousterhout] Added missing return type e150278 [Kay Ousterhout] [SPARK-8880] Fix confusing Stage.attemptId member variable
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala4
3 files changed, 18 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6841fa8357..f3d87ee5c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -872,7 +872,7 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
- stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
+ stage.makeNewStageAttempt(partitionsToCompute.size)
outputCommitCoordinator.stageStart(stage.id)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
@@ -937,8 +937,8 @@ class DAGScheduler(
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
- taskScheduler.submitTasks(
- new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
+ taskScheduler.submitTasks(new TaskSet(
+ tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c59d6e4f5b..b86724de2c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -62,22 +62,28 @@ private[spark] abstract class Stage(
var pendingTasks = new HashSet[Task[_]]
+ /** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
val name = callSite.shortForm
val details = callSite.longForm
- /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
- var latestInfo: StageInfo = StageInfo.fromStage(this)
+ /**
+ * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
+ * here, before any attempts have actually been created, because the DAGScheduler uses this
+ * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
+ * have been created).
+ */
+ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
- /** Return a new attempt id, starting with 0. */
- def newAttemptId(): Int = {
- val id = nextAttemptId
+ /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
+ def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
+ _latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute))
nextAttemptId += 1
- id
}
- def attemptId: Int = nextAttemptId
+ /** Returns the StageInfo for the most recent attempt for this stage. */
+ def latestInfo: StageInfo = _latestInfo
override final def hashCode(): Int = id
override final def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index e439d2a7e1..5d2abbc67e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -70,12 +70,12 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
- def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
+ def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(
stage.id,
- stage.attemptId,
+ attemptId,
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,