aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-06 19:06:30 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-06 19:06:30 -0700
commit3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78 (patch)
tree12db31afe1ca762b95c5b4862a97f55abf0592e2
parent110fb8b24d2454ad7c979c3934dbed87650f17b8 (diff)
downloadspark-3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78.tar.gz
spark-3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78.tar.bz2
spark-3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78.zip
[SPARK-3353] parent stage should have lower stage id.
Previously parent stages had higher stage id, but parent stages are executed first. This pull request changes the behavior so parent stages would have lower stage id. For example, command: ```scala sc.parallelize(1 to 10).map(x=>(x,x)).reduceByKey(_+_).count ``` breaks down into 2 stages. The old web UI: ![screen shot 2014-09-04 at 12 42 44 am](https://cloud.githubusercontent.com/assets/323388/4146177/60fb4f42-3407-11e4-819f-853eb0e22b25.png) Web UI with this patch: ![screen shot 2014-09-04 at 12 44 55 am](https://cloud.githubusercontent.com/assets/323388/4146178/62e08e62-3407-11e4-867b-a36b10534464.png) Author: Reynold Xin <rxin@apache.org> Closes #2273 from rxin/lower-stage-id and squashes the following commits: abbb4c6 [Reynold Xin] Fixed SparkListenerSuite. 0e02379 [Reynold Xin] Updated DAGSchedulerSuite. 54ccea3 [Reynold Xin] [SPARK-3353] parent stage should have lower stage id.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
3 files changed, 19 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2ccc27324a..6fcf9e3154 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -241,9 +241,9 @@ class DAGScheduler(
callSite: CallSite)
: Stage =
{
+ val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
- val stage =
- new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
+ val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 0bb91febde..aa73469b6a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark._
+import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@@ -97,10 +98,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() {
- val successfulStages = new HashSet[Int]()
- val failedStages = new ArrayBuffer[Int]()
+ val successfulStages = new HashSet[Int]
+ val failedStages = new ArrayBuffer[Int]
+ val stageByOrderOfExecution = new ArrayBuffer[Int]
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
+ stageByOrderOfExecution += stageInfo.stageId
if (stageInfo.failureReason.isEmpty) {
successfulStages += stageInfo.stageId
} else {
@@ -231,6 +234,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(JobCancelled(jobId))
}
+ test("[SPARK-3353] parent stage should have lower stage id") {
+ sparkListener.stageByOrderOfExecution.clear()
+ sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
+ assert(sparkListener.stageByOrderOfExecution.length === 2)
+ assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
+ }
+
test("zero split job") {
var numResults = 0
val fakeListener = new JobListener() {
@@ -457,7 +467,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
null,
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
- assert(sparkListener.failedStages.contains(0))
+ assert(sparkListener.failedStages.contains(1))
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
@@ -515,8 +525,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
- assert(sparkListener.failedStages.contains(1))
- assert(sparkListener.failedStages.size === 1)
+ assert(sparkListener.failedStages.toSet === Set(0))
assertDataStructuresEmpty
}
@@ -563,14 +572,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
val stageFailureMessage = "Exception failure in map stage"
failed(taskSets(0), stageFailureMessage)
- assert(cancelledStages.contains(1))
+ assert(cancelledStages.toSet === Set(0, 2))
// Make sure the listeners got told about both failed stages.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.successfulStages.isEmpty)
- assert(sparkListener.failedStages.contains(1))
- assert(sparkListener.failedStages.contains(3))
- assert(sparkListener.failedStages.size === 2)
+ assert(sparkListener.failedStages.toSet === Set(0, 2))
assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 3b0b8e2f68..ab35e8edc4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -180,7 +180,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
- val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
+ val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true}
stageInfo3.rddInfos.exists(_.name == "Trois") should be {true}